Я могу создавать динамические задачи и пытаюсь управлять последовательной обработкой, используя max_active_tis_per_dag=1. Весь даг должен завершиться сбоем в момент сбоя сопоставленной задачи.
Простой пример:
Зависимости:
Задача1 (выход = список)-> Сопоставленная задача2(0) -> Сопоставленная задача(1) -> Сопоставленная задача(2)
если сопоставленная задача 0 завершается сбоем, то сопоставленная задача 1 и сопоставленная задача 2 не запускается (автоматический сбой)
если сопоставленная задача 0 выполнена успешно, а сопоставленная задача 1 завершается с ошибкой, то сопоставленная задача 2 завершается сбоем
Код: Выделить всё
@dag(schedule=None, start_date=pendulum.datetime(2024, 5, 21, 12, 00, 00), catchup=False)
def get_execution_steps():
1) Gets steps (list of stored procedures) to execute from big query table
2) Sorts steps into the desired processing order. Desire is to process step 1 through x (length of list from step 1).
3) Expand tasks so each stored procedure can be seen running as an individual task with the ability to retrigger a dag from point of failure on without rerunning prior stored procedures.
@task()
def get_and_sort_tasks():
client=bigquery.Client()
unsorted_task_list = client.query(bq_steps_sql, location='US')
logger.info(repr(unsorted_task_list.result()))
sorted_steps_list = sorted(unsorted_task_list, key=lambda x: (int(x[0]), int(x[1])))
logger.info(sorted_steps_list)
sorted_task_list=[]
for step in sorted_steps_list:
execution_task_name = 'Execute_statement_task{}_subtask{}'.format(step[0], step[1])
sql=step[2]
logger.info([execution_task_name, sql])
sorted_task_list.append([execution_task_name, sql])
@task(max_active_tis_per_dag=1, task_id= 'execute_statement_task', map_index_template="{{task_name}}")
def execute_statements(sorted_task_list):
logger.info(sorted_task_list)
context = get_current_context()
context["task_name"] = sorted_task_list[0]
logger.info('Running:{}'.format(sorted_task_list[1]))
client=bigquery.Client()
query=sorted_task_list[1]
job = client.query(query, location='US')
logger.info(repr(job.result()))
tasks = execute_statements.partial().expand(sorted_task_list=get_and_sort_tasks())
Trigger_rules, похоже, находятся на более высоком уровне задачи. Я попытался установить подчиненную задачу с помощью триггера_rule='one_failed', который фактически срабатывает, если, например, сопоставленная задача 0 завершается сбоем, но каждая отдельная сопоставленная задача 1 и сопоставленная задача 2 продолжают обрабатываться.
Код: Выделить всё
@task(task_id='task_exception', trigger_rule=('one_failed')
def task_exception():
raise AirflowFailException('Dynamic Statement Failed')
https://airflow.apache.org/docs/apache- ... ng/dynamic -task-mapping.html
https://airflow.apache.org/docs/apache- ... index.html
Подробнее здесь: https://stackoverflow.com/questions/785 ... gst-mapped