Как выполнить динамическое сопоставление задач на уровне TaskGroup в Airflow с помощью операторов?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как выполнить динамическое сопоставление задач на уровне TaskGroup в Airflow с помощью операторов?

Сообщение Anonymous »

Тело:
Я пытаюсь реализовать динамическое сопоставление задач в Apache Airflow, используя TaskGroup, включающий такие операторы, как SQLCheckOperator и SQLExecuteQueryOperator. Мне удалось использовать динамическое сопоставление задач на уровне отдельных задач внутри группы, но я не уверен, как добиться динамического сопоставления задач непосредственно на уровне TaskGroup.
Вот рабочий пример с динамическим сопоставление задач для отдельных задач:

Код: Выделить всё

with TaskGroup('check_group') as check:
t1 = SQLCheckOperator.partial(
task_id='check1',
sql='SELECT count(*)> 0 FROM data_table WHERE id = {{ task.parameters["id"] }}'
).expand(parameters=task_id.output)

t2 = SQLExecuteQueryOperator.partial(
task_id='update1',
map_index_template="{{ task.parameters['id'] }}",
sql='''UPDATE table SET date = NOW() WHERE id = {{ task.parameters["id"] }} '''
).expand(parameters=task_id.output)
У меня также есть подобная рабочая задача, которая динамически извлекает данные для Task_id:

Код: Выделить всё

task_id = SQLExecuteQueryOperator(
task_id='scope',
sql='''SELECT DISTINCT id FROM idtable''',
handler=lambda x: [dict(zip(['id'], row)) for row in fetch_all_handler(x)]
)
Это отлично работает, и я могу выполнять динамическое сопоставление задач на уровне задач для каждой задачи в группе.
Проблема:
Я хочу реализовать динамическое сопоставление задач на уровне TaskGroup, как показано в примере ниже, с использованием декоратора @task_group:

Код: Выделить всё

@task_group(group_id="group1")
def tg1(my_num):
@task
def print_num(num):
return num

@task
def add_42(num):
return num + 42

print_num(my_num) >> add_42(my_num)

# Creating 6 mapped task group instances of the task group group1
tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])
В этом примере я могу добиться динамического сопоставления задач, передавая значения задачам, украшенным @task, но я не могу воспроизвести это напрямую, используя такие операторы, как SQLCheckOperator или SQLExecuteQueryOperator в TaskGroup.
Так, например, такой код не работает, но соответствует моей идее динамического сопоставления задач в группе с оператором внутри. Так, например, такой код не работает, но соответствует моей идее динамического отображения задач в группе с оператором внутри.

Код: Выделить всё

@task_group(group_id="group1")
def tg1(id):
# Task 1: Check if the condition is met
def check1(id):
return SQLCheckOperator(
task_id='check1',
sql='SELECT count(*) > 0 FROM data_table WHERE id = {{ task.parameters["id"] }}'
)

# Task 2: Update the table if condition is met
def update1(id):
return SQLExecuteQueryOperator(
task_id='update1',
map_index_template="{{ task.parameters['id'] }}",
sql='''UPDATE table SET date = NOW() WHERE id = {{ task.parameters["id"] }}'''
)

# Set task dependencies
check1(id) >> update1(id)

# Create 6 instances of the task group group1 with dynamic task mapping
tg1_object = tg1.expand(id=[19, 23, 42, 8, 7, 108])
Я получил эту ошибку, когда попробовал это решение

Код: Выделить всё

MappedArgument(_input=ListOfDictsExpandInput(value=XComArg()), _key='id')'
Вопрос:
Как добиться динамического сопоставления задач на уровне TaskGroup с помощью операторов (вместо декоратора @task)? Можно ли использоватьexpand() или другой механизм для достижения такого поведения непосредственно внутри TaskGroup?
Я хотел бы, насколько это возможно, избегать использования цикла.
Я хочу чтобы найти решение моей проблемы, а именно найти способ создать динамическое сопоставление задач для группы задач с оператором внутри

Подробнее здесь: https://stackoverflow.com/questions/792 ... using-oper
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»