Я хочу создать рабочий процесс DAG с 5 различными ветвями следующим образом:
База DAG - это:
from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy import DummyOperator
def get_path(**kwargs):
params = kwargs.get('params',{})
if params.get('path') == '1':
return 'task_2_a'
elif params.get('path') == '2':
return 'task_2_b'
elif params.get('path') == '3':
return 'task_2_c'
elif params.get('path') == '4':
return ['task_2_a','task_2_b']
else:
return ['task_2_a','task_2_b', 'task_2_c']
with DAG(
'test',
description='test',
tags=["test"],
schedule_interval=None,
start_date=datetime(2025, 7, 1),
default_args={
'retries': 0,
'retry_delay': timedelta(minutes=1),
'conn_id': 'sgk_gp'
},
params={
'name':'',
'path':''
}
) as dag:
task_1 = SQLExecuteQueryOperator(
task_id='task_1',
sql=f"""
drop table if exists {{{{dag_run.conf.name}}}};
create table {{{{dag_run.conf.name}}}} (
some_text character varying
)
"""
)
branch_1 = BranchPythonOperator(
task_id='branch_1',
python_callable=get_path,
provide_context=True,
do_xcom_push=False
)
task_2_a = SQLExecuteQueryOperator(
task_id='task_2_a',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'aaa' as some_text) as tab
"""
)
task_2_b = SQLExecuteQueryOperator(
task_id='task_2_b',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'bbb' as some_text) as tab
"""
)
task_2_c = SQLExecuteQueryOperator(
task_id='task_2_c',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'ccc' as some_text) as tab
"""
)
task_3 = SQLExecuteQueryOperator(
task_id='task_3',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select '333' as some_text) as tab
"""
)
complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)
< /code>
Я попытался установить рабочий процесс, подобный этому изначально: < /p>
task_1 >> branch_1 >> [task_2_a >> task_2_b >> task_2_c] >> task_3 >> complete
< /code>
Но это приведет к тому, что Task_2 выполняется параллельно, если были выбраны филиалы 4 или 5, и мне нужно, чтобы они выполнялись строго по порядку. task_1 >> branch_1 >> task_2_a >> task_3 >> complete
task_1 >> branch_1 >> task_2_b >> task_3 >> complete
task_1 >> branch_1 >> task_2_c >> task_3 >> complete
task_1 >> branch_1 >> task_2_a >> task_2_b >> task_3 >> complete
task_1 >> branch_1 >> task_2_a >> task_2_b >> task_2_c >> task_3 >> complete
< /code>
У меня была идея реализовать несколько операторов филиала, но это, на мой взгляд, очень запутанная структура. Есть простой способ достичь этого?
Подробнее здесь: https://stackoverflow.com/questions/797 ... in-airflow
Как изменить заказ A и количество задач, которые будут выполнены в воздушном потоке ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf
Anonymous » » в форуме Python - 0 Ответы
- 18 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf
Anonymous » » в форуме Python - 0 Ответы
- 20 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Получите список отмененных задач, которые были выполнены, когда исполнитель был закрыт
Anonymous » » в форуме JAVA - 0 Ответы
- 3 Просмотры
-
Последнее сообщение Anonymous
-