[Airflow]: динамическое сопоставление задач в DockerOperator с использованием XcomsPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 [Airflow]: динамическое сопоставление задач в DockerOperator с использованием Xcoms

Сообщение Anonymous »

Я создаю даг, который должен делать следующее:
  • извлекать идентификаторы событий
  • для каждого идентификатора события , получить сведения о событии ( DockerOperator )
Приведенный ниже код — моя попытка сделать то, что я хочу:

Код: Выделить всё

import re
from datetime import datetime

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue

leagues = ["league1", "league2", "league3"]

@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
)
task_fetch_ids = PythonOperator(
task_id="fetch_detail",
...)

task_fetch_detail = DockerOperator(
task_id="fetch_detail",
image="image:v1",
).expand(
command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='task_fetch_ids', key='return_value') }}"]
)

task_fetch_ids >> task_fetch_detail

Вышеописанное явно не работает, поскольку я перебираю строку в цикле.
Какой синтаксис правильный?


Подробнее здесь: https://stackoverflow.com/questions/791 ... sing-xcoms
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»