[Airflow]: динамическое сопоставление задач в DockerOperator с использованием XcomsPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 [Airflow]: динамическое сопоставление задач в DockerOperator с использованием Xcoms

Сообщение Anonymous »

Я создаю даг, который должен делать следующее:
  • извлекать идентификаторы событий
  • для каждого идентификатора события , получить сведения о событии ( DockerOperator )
Приведенный ниже код — моя попытка сделать то, что я хочу:

Код: Выделить всё

from datetime import datetime

from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator

With Dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
):
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)

task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)

task_fetch_ids >> task_fetch_detail

Вышеописанное явно не работает, поскольку я перебираю строку в цикле.
Какой синтаксис правильный?


Подробнее здесь: https://stackoverflow.com/questions/791 ... sing-xcoms
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»