Динамическое сопоставление задач в Airflow — установка последовательных зависимостей между сопоставленными задачами.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Динамическое сопоставление задач в Airflow — установка последовательных зависимостей между сопоставленными задачами.

Сообщение Anonymous »

У меня есть даг, который запрашивает таблицу в bigquery, которая возвращает список хранимых процедур для выполнения, который может различаться как по количеству хранимых процедур, так и по фактической хранимой процедуре, необходимой для вызова конкретного запуска конвейера. p>
Я могу создавать динамические задачи и пытаюсь управлять последовательной обработкой, используя max_active_tis_per_dag=1. Весь даг должен завершиться сбоем в момент сбоя сопоставленной задачи.
Простой пример:
Зависимости:
Задача1 (выход = список)-> Сопоставленная задача2(0) -> Сопоставленная задача(1) -> Сопоставленная задача(2)
если сопоставленная задача 0 завершается сбоем, то сопоставленная задача 1 и сопоставленная задача 2 не запускается (автоматический сбой)
если сопоставленная задача 0 выполнена успешно, а сопоставленная задача 1 завершается с ошибкой, то сопоставленная задача 2 завершается сбоем

Код: Выделить всё

@dag(schedule=None, start_date=pendulum.datetime(2024, 5, 21, 12, 00, 00), catchup=False)
def get_execution_steps():
1) Gets steps (list of stored procedures) to execute from big query table
2) Sorts steps into the desired processing order.  Desire is to process step 1 through x (length of list from step 1).
3) Expand tasks so each stored procedure can be seen running as an individual task with the ability to retrigger a dag from point of failure on without rerunning prior stored procedures.

@task()
def get_and_sort_tasks():
client=bigquery.Client()
unsorted_task_list = client.query(bq_steps_sql, location='US')
logger.info(repr(unsorted_task_list.result()))
sorted_steps_list = sorted(unsorted_task_list, key=lambda x: (int(x[0]), int(x[1])))
logger.info(sorted_steps_list)
sorted_task_list=[]
for step in sorted_steps_list:
execution_task_name = 'Execute_statement_task{}_subtask{}'.format(step[0], step[1])
sql=step[2]
logger.info([execution_task_name, sql])
sorted_task_list.append([execution_task_name, sql])

@task(max_active_tis_per_dag=1,  task_id= 'execute_statement_task', map_index_template="{{task_name}}")
def execute_statements(sorted_task_list):
logger.info(sorted_task_list)
context = get_current_context()
context["task_name"] = sorted_task_list[0]
logger.info('Running:{}'.format(sorted_task_list[1]))
client=bigquery.Client()
query=sorted_task_list[1]
job = client.query(query, location='US')
logger.info(repr(job.result()))

tasks = execute_statements.partial().expand(sorted_task_list=get_and_sort_tasks())

Элементы, которые я пробовал:
Trigger_rules, похоже, находятся на более высоком уровне задачи. Я попытался установить подчиненную задачу с помощью триггера_rule='one_failed', который фактически срабатывает, если, например, сопоставленная задача 0 завершается сбоем, но каждая отдельная сопоставленная задача 1 и сопоставленная задача 2 продолжают обрабатываться.

Код: Выделить всё

@task(task_id='task_exception', trigger_rule=('one_failed')
def task_exception():
raise AirflowFailException('Dynamic Statement Failed')
Ссылки, которые я просмотрел здесь и здесь.
https://airflow.apache.org/docs/apache- ... ng/dynamic -task-mapping.html
https://airflow.apache.org/docs/apache- ... index.html

Подробнее здесь: https://stackoverflow.com/questions/785 ... gst-mapped
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»