Динамическое секвенирование задач в воздушном потоке через DAG Conf ConfPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf

Сообщение Anonymous »

у меня есть следующий воздушный поток dag
/p>
Я могу правильно определить зависимости в DAG выше, потому что я жестко кодировал список элементов (упомянутый в фрагменте кода). Но предположим, что я хочу, чтобы список элементов был взят из конфигурации DAG RAG Airflow DAG (во время выполнения), а затем был создан DAG, как бы я это достигнул? Я прошел через n количества статей, но все еще не могу найти способ включить список элементов от DAG_RUN.Conf. Может кто -нибудь, пожалуйста, помогите здесь, так как я заблокирован и не вижу способа выйти из этого < /p>
elements = [
[["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
[["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
[["s", "t"], ["u", "v"], ["w", "x"]],
[["y", "z"]]
]

Соответствующий код
from airflow import DAG
from airflow.decorators import task
from datetime import datetime

@task
def process_element(element: str) -> str:
"""
Task to process an individual element.
"""
print(f"Processing element {element}")
return f"Processed {element}"

dag = DAG(
'process_elements_dag',
description='DAG for processing elements with dynamic dependencies',
schedule_interval=None,
start_date=datetime(2025, 1, 28),
)

elements = [
[["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
[["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
[["s", "t"], ["u", "v"], ["w", "x"]],
[["y", "z"]]
]

with dag:
for element_idx, element in enumerate(elements):
task_groups = [
[process_element.override(task_id=f"process_element_{element}")(element) for element in group]
for group in element
]

# Set dependencies sequentially between groups
for i in range(len(task_groups) - 1):
for upstream_task in task_groups:
for downstream_task in task_groups[i + 1]:
downstream_task.set_upstream(upstream_task)


Подробнее здесь: https://stackoverflow.com/questions/793 ... g-run-conf
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • В воздушном потоке 2.10 Могу ли я использовать динамическое отображение задач с BranchPythonoperator?
    Anonymous » » в форуме Python
    0 Ответы
    1 Просмотры
    Последнее сообщение Anonymous
  • Задача настройки в воздушном потоке DAG
    Anonymous » » в форуме Python
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Задача настройки в воздушном потоке DAG
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Есть ли страница Dag Devingecy или аналог в воздушном потоке 3.0
    Anonymous » » в форуме Python
    0 Ответы
    0 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»