Я пытаюсь реализовать динамическое сопоставление задач в Apache Airflow, используя TaskGroup, включающую такие операторы, как SQLCheckOperator и SQLExecuteQueryOperator. Мне удалось использовать динамическое сопоставление задач на уровне отдельных задач внутри группы, но я не уверен, как добиться динамического сопоставления задач непосредственно на уровне TaskGroup.
Вот рабочий пример с динамическим сопоставление задач для отдельных задач:
Код: Выделить всё
with TaskGroup('check_group') as check:
t1 = SQLCheckOperator.partial(
task_id='check1',
sql='SELECT count(*)> 0 FROM data_table WHERE id = {{ task.parameters["id"] }}'
).expand(parameters=task_id.output)
t2 = SQLExecuteQueryOperator.partial(
task_id='update1',
map_index_template="{{ task.parameters['id'] }}",
sql='''UPDATE table SET date = NOW() WHERE id = {{ task.parameters["id"] }} '''
).expand(parameters=task_id.output)
Код: Выделить всё
task_id = SQLExecuteQueryOperator(
task_id='scope',
sql='''SELECT DISTINCT id FROM idtable''',
handler=lambda x: [dict(zip(['id'], row)) for row in fetch_all_handler(x)]
)
Проблема:
Я хочу реализовать динамическое сопоставление задач на уровне TaskGroup, как показано в примере ниже, с использованием декоратора @task_group:
Код: Выделить всё
@task_group(group_id="group1")
def tg1(my_num):
@task
def print_num(num):
return num
@task
def add_42(num):
return num + 42
print_num(my_num) >> add_42(my_num)
# Creating 6 mapped task group instances of the task group group1
tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])
Так, например, такой код не работает, но соответствует моей идее динамического сопоставления задач в группе с оператором внутри. Так, например, такой код не работает, но соответствует моей идее динамического отображения задач в группе с оператором внутри.
Код: Выделить всё
@task_group(group_id="group1")
def tg1(id):
# Task 1: Check if the condition is met
def check1(id):
return SQLCheckOperator(
task_id='check1',
sql='SELECT count(*) > 0 FROM data_table WHERE id = {{ task.parameters["id"] }}'
)
# Task 2: Update the table if condition is met
def update1(id):
return SQLExecuteQueryOperator(
task_id='update1',
map_index_template="{{ task.parameters['id'] }}",
sql='''UPDATE table SET date = NOW() WHERE id = {{ task.parameters["id"] }}'''
)
# Set task dependencies
check1(id) >> update1(id)
# Create 6 instances of the task group group1 with dynamic task mapping
tg1_object = tg1.expand(id=[19, 23, 42, 8, 7, 108])
Код: Выделить всё
MappedArgument(_input=ListOfDictsExpandInput(value=XComArg()), _key='id')'
Как добиться динамического сопоставления задач на уровне TaskGroup с помощью операторов (вместо декоратора @task)? Можно ли использоватьexpand() или другой механизм для достижения такого поведения непосредственно внутри TaskGroup?
Я хотел бы, насколько это возможно, избегать использования цикла.
Я хочу чтобы найти решение моей проблемы, а именно найти способ создать динамическое сопоставление задач для группы задач с оператором внутри
Подробнее здесь: https://stackoverflow.com/questions/792 ... using-oper