Как установить начальное состояние для группы задач воздушного потокаPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как установить начальное состояние для группы задач воздушного потока

Сообщение Anonymous »

У меня есть следующая структура DAG: < /p>
from datetime import datetime, timedelta
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

with DAG(
'irb_test',
description='irb_test',
tags=["irb_test"],
schedule_interval=None,
start_date=datetime(2025, 7, 1),
default_args={
'retries': 0,
'retry_delay': timedelta(minutes=1),
'conn_id': 'sgk_gp'
}
) as dag:

with TaskGroup('test_1') as test_1:
for i in range(15):
task_1_i = SQLExecuteQueryOperator(
task_id=f'task_1_{i}',
sql=f"""
select pg_sleep({i})
"""
)
with TaskGroup('test_2') as test_2:
for i in range(15):
task_2_i = SQLExecuteQueryOperator(
task_id=f'task_2_{i}',
sql=f"""
select pg_sleep({i})
"""
)

for i in range(15):
test_1.get_child_by_label(f'task_1_{i}')\
>> test_2.get_child_by_label(f'task_2_{i}')
< /code>
Я хотел бы запустить группу задач Test_2 только после того, как преуспели в 10 задачах в первой группе задач. Я знаю, что могу переписать заказ на выполнение вручную, но есть ли более элегантный способ сделать это?

Подробнее здесь: https://stackoverflow.com/questions/797 ... task-group
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»