[Airflow]: применение динамического сопоставления задач к контейнеру докеров с помощью Xcoms.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 [Airflow]: применение динамического сопоставления задач к контейнеру докеров с помощью Xcoms.

Сообщение Anonymous »

Я создаю даг, который должен делать следующее:
  • извлекать идентификаторы событий
  • для каждого идентификатора события , получить сведения о событии
Два шага используют контейнер докеров, так как я не хочу использовать PythonOperator, поскольку мне хотелось бы развернуть проект в в какой-то момент и не хочу, чтобы обработка выполнялась в том же экземпляре, где работает Airflow.
Приведенный ниже код — моя попытка сделать то, что я хочу:

Код: Выделить всё

import re
from datetime import datetime

from airflow.decorators import dag, task
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.models.baseoperator import chain
from airflow.operators.python import PythonOperator
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.providers.slack.notifications.slack import SlackNotifier
from airflow.sensors.base import PokeReturnValue

leagues = ["league1", "league2", "league3"]

def extract_event_ids_from_logs(ti, previous_task):
logs = ti.xcom_pull(key="return_value", task_ids=previous_task)

match = re.search(r"\[([0-9, ]+)\]", logs)
if match:
a = [int(id.strip()) for id in match.group(1).split(",")]
return a
return []

def merge_ids(ti, tasks):
ids = []
for ti_ in tasks:
val = ti.xcom_pull(key="return_value", task_ids=ti_)
ids.extend(val)
return ids

@dag(
start_date=datetime(2024, 11, 1),
schedule="@daily",
catchup=False,
tags=["scraping"],
)
def scraping():
tasks = []

for league in leagues:
fetch_league_events = DockerOperator(
task_id=f"fetch_{league}_events",
max_active_tis_per_dag=1,
image="image:v1",
command=f"fetch-event-ids-from-league --league {league}",
api_version="auto",
auto_remove=True,
tty=True,
xcom_all=False,
mount_tmp_dir=False,
)
extract_ids = PythonOperator(
task_id=f"extract_ids_{league}",
python_callable=extract_event_ids_from_logs,
op_kwargs={"previous_task": fetch_league_events.task_id},
)

fetch_league_events >> extract_ids
tasks.append(extract_ids)

merge_task = PythonOperator(
task_id="merge_ids",
python_callable=merge_ids,
op_kwargs={"tasks": [o.task_id for o in tasks]},
# provide_context=False,
)
for task_ in tasks:
task_ >> merge_task

fetch_event_details = DockerOperator(
task_id="fetch_event_details",
image="image:v1",
).expand(
command=[f"fetch-event --event-id  {event_id}" for event_id in "{{ ti.xcom_pull(task_ids='merge_ids', key='return_value') }}"]
)

merge_task >> fetch_event_details

scraping()

Вышеописанное явно не работает, потому что я перебираю строку.
Каков правильный синтаксис для выполнения того, чего я пытаюсь достичь?
Я также пробовал разные вещи, например, я пытался создать оператор Python, который извлекает значения XComs и выполняет цикл с помощью оператора Docker. Я не получал никаких ошибок, но задачи Docker не запускались.

Код: Выделить всё

def fetch_event_using_docker(ti):
event_ids = ti.xcom_pull(task_ids="merge_ids", key="return_value")

for event_id in event_ids:
action = DockerOperator(
task_id=f"fetch_event_{event_id}",
image="image:v1",
api_version="auto",
auto_remove=True,
tty=True,
xcom_all=False,
mount_tmp_dir=False,
max_active_tis_per_dag=1,
command=f"fetch-event --event-id {event_id}",
)

// ...  existing code ...
fetch_task = PythonOperator(
task_id="fetch_events",
python_callable=fetch_event_using_docker,
)

merge_task >> fetch_task
Я пытался поискать в Интернете, но не нашел ничего полезного. Я нашел эту тему, в которой говорилось, что не рекомендуется использовать xcoms для создания дага (здесь). Я понимаю логику, но какой шаблон проектирования будет правильным для достижения того, чего я пытаюсь достичь.


Подробнее здесь: https://stackoverflow.com/questions/791 ... sing-xcoms
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»