у меня есть следующий воздушный поток dag
/p>
Я могу правильно определить зависимости в DAG выше, потому что я жестко кодировал список элементов (упомянутый в фрагменте кода). Но предположим, что я хочу, чтобы список элементов был взят из конфигурации DAG RAG Airflow DAG (во время выполнения), а затем был создан DAG, как бы я это достигнул? Я прошел через n количества статей, но все еще не могу найти способ включить список элементов от DAG_RUN.Conf. Может кто -нибудь, пожалуйста, помогите здесь, так как я заблокирован и не вижу способа выйти из этого < /p>
elements = [
[["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
[["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
[["s", "t"], ["u", "v"], ["w", "x"]],
[["y", "z"]]
]
Соответствующий код
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
@task
def process_element(element: str) -> str:
"""
Task to process an individual element.
"""
print(f"Processing element {element}")
return f"Processed {element}"
dag = DAG(
'process_elements_dag',
description='DAG for processing elements with dynamic dependencies',
schedule_interval=None,
start_date=datetime(2025, 1, 28),
)
elements = [
[["a", "b"], ["c", "d"], ["e", "f"], ["g", "h"], ["i", "j"]],
[["k", "l"], ["m", "n"], ["o", "p"], ["q", "r"]],
[["s", "t"], ["u", "v"], ["w", "x"]],
[["y", "z"]]
]
with dag:
for element_idx, element in enumerate(elements):
task_groups = [
[process_element.override(task_id=f"process_element_{element}")(element) for element in group]
for group in element
]
# Set dependencies sequentially between groups
for i in range(len(task_groups) - 1):
for upstream_task in task_groups:
for downstream_task in task_groups[i + 1]:
downstream_task.set_upstream(upstream_task)
Подробнее здесь: https://stackoverflow.com/questions/793 ... g-run-conf
Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf
Anonymous » » в форуме Python - 0 Ответы
- 16 Просмотры
-
Последнее сообщение Anonymous
-