Как изменить заказ A и количество задач, которые будут выполнены в воздушном потокеPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как изменить заказ A и количество задач, которые будут выполнены в воздушном потоке

Сообщение Anonymous »

Я хочу создать рабочий процесс DAG с 5 различными ветвями следующим образом:

База DAG - это:
from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.dummy import DummyOperator

def get_path(**kwargs):
params = kwargs.get('params',{})
if params.get('path') == '1':
return 'task_2_a'
elif params.get('path') == '2':
return 'task_2_b'
elif params.get('path') == '3':
return 'task_2_c'
elif params.get('path') == '4':
return ['task_2_a','task_2_b']
else:
return ['task_2_a','task_2_b', 'task_2_c']

with DAG(
'test',
description='test',
tags=["test"],
schedule_interval=None,
start_date=datetime(2025, 7, 1),
default_args={
'retries': 0,
'retry_delay': timedelta(minutes=1),
'conn_id': 'sgk_gp'
},
params={
'name':'',
'path':''
}
) as dag:

task_1 = SQLExecuteQueryOperator(
task_id='task_1',
sql=f"""
drop table if exists {{{{dag_run.conf.name}}}};
create table {{{{dag_run.conf.name}}}} (
some_text character varying
)
"""
)

branch_1 = BranchPythonOperator(
task_id='branch_1',
python_callable=get_path,
provide_context=True,
do_xcom_push=False
)

task_2_a = SQLExecuteQueryOperator(
task_id='task_2_a',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'aaa' as some_text) as tab
"""
)

task_2_b = SQLExecuteQueryOperator(
task_id='task_2_b',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'bbb' as some_text) as tab
"""
)

task_2_c = SQLExecuteQueryOperator(
task_id='task_2_c',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select 'ccc' as some_text) as tab
"""
)

task_3 = SQLExecuteQueryOperator(
task_id='task_3',
sql=f"""
insert into {{{{dag_run.conf.name}}}} (some_text)
select some_text from (select '333' as some_text) as tab
"""
)

complete = DummyOperator(task_id="complete", trigger_rule=TriggerRule.NONE_FAILED)
< /code>
Я попытался установить рабочий процесс, подобный этому изначально: < /p>
task_1 >> branch_1 >> [task_2_a >> task_2_b >> task_2_c] >> task_3 >> complete
< /code>
Но это приведет к тому, что Task_2 выполняется параллельно, если были выбраны филиалы 4 или 5, и мне нужно, чтобы они выполнялись строго по порядку. task_1 >> branch_1 >> task_2_a >> task_3 >> complete
task_1 >> branch_1 >> task_2_b >> task_3 >> complete
task_1 >> branch_1 >> task_2_c >> task_3 >> complete
task_1 >> branch_1 >> task_2_a >> task_2_b >> task_3 >> complete
task_1 >> branch_1 >> task_2_a >> task_2_b >> task_2_c >> task_3 >> complete
< /code>
У меня была идея реализовать несколько операторов филиала, но это, на мой взгляд, очень запутанная структура. Есть простой способ достичь этого?

Подробнее здесь: https://stackoverflow.com/questions/797 ... in-airflow
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»