В воздушном потоке 2.10 Могу ли я использовать динамическое отображение задач с BranchPythonoperator?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 В воздушном потоке 2.10 Могу ли я использовать динамическое отображение задач с BranchPythonoperator?

Сообщение Anonymous »

Ниже приведена минимальная реализация оператора филиала, использующего API Taskflow. ODD_TASK или evel_task также будет использовать значение return_int , на котором было принято решение о ветвлении. Затем будет выполнена final_task . Все прямолинейно.

Код: Выделить всё

from airflow.decorators import dag, task

@dag(dag_id='Example_Dag_Simple_Branch')
def simple_dag():

@task(task_id='return_int')
def return_int():
return 3

@task.branch(task_id='branch_on_condition')
def branch_on_condition(upstream_value):
if upstream_value & 1:
return 'odd_task'
else:
return 'even_task'

@task(task_id='odd_task')
def odd_task(input_val):
print(f"{input_val} is an odd number")
return input_val

@task(task_id='even_task')
def even_task(input_val):
print(f"{input_val} is an even number")
return input_val

@task(task_id='final_task', trigger_rule='one_success')
def final_task():
print('final task executed')
return

returned_int = return_int()
branch_value = branch_on_condition(upstream_value=returned_int)
even_task_return = even_task(input_val=returned_int)
odd_task_return = odd_task(input_val=returned_int)
final_return = final_task()

branch_value >> [even_task_return, odd_task_return] >> final_return

simple_dag()
Это журналы либо info - 3 - нечетное число или info - 2 - равномерное число .
Почему я не могу реализовать аналогичный шаблон, используя динамические задачи? Перечислите нормально, но обе ветви выполняются!

Код: Выделить всё

from airflow.decorators import dag, task

@dag(dag_id='Example_Dag_Dynamic_Branch')
def simple_dag():

@task(task_id='return_list')
def return_list():
return [1,2,3,4,5,6,7]

@task.branch(task_id='branch_on_condition')
def branch_on_condition(upstream_value):
if upstream_value & 1:
return 'odd_task'
else:
return 'even_task'

@task(task_id='odd_task')
def odd_task(input_val):
print(f"{input_val} is an odd number")
return input_val

@task(task_id='even_task')
def even_task(input_val):
print(f"{input_val} is an even number")
return input_val

@task(task_id='final_task', trigger_rule='one_success')
def final_task():
print('final task executed')
return

returned_list = return_list()
branch_value = branch_on_condition.expand(upstream_value=returned_list)
even_task_return = even_task.expand(input_val=returned_list)
odd_task_return = odd_task.expand(input_val=returned_list)
final_return = final_task()

branch_value >> [even_task_return, odd_task_return] >> final_return

simple_dag()
И поэтому выход odd_task , например, 7 задач, все, что говорит, что информация - n - нечетное число для N 1–7, что, конечно, не так. /> [*] Объединяется с .expand () и .partial ()
[*] Чтение связанных вопросов: 1. 2. 3.

Ни один из них действительно не ответит на вопрос! Но на самом деле поведение, если еще может стать довольно сложным и может быть приятно разделить на несколько задач.

Подробнее здесь: https://stackoverflow.com/questions/795 ... onoperator
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Динамическое секвенирование задач в воздушном потоке через DAG Conf Conf
    Anonymous » » в форуме Python
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Получение зависимостей задач оператора в воздушном потоке
    Anonymous » » в форуме Python
    0 Ответы
    25 Просмотры
    Последнее сообщение Anonymous
  • Airflow Branchpythonoperator работает неправильно, когда вы называете следующими способами в DAG
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Как можно использовать хранилище hashicorp в воздушном потоке?
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»