Airflow Branchpythonoperator работает неправильно, когда вы называете следующими способами в DAGPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Airflow Branchpythonoperator работает неправильно, когда вы называете следующими способами в DAG

Сообщение Anonymous »

Я реализовал следующий конвейер:
DAG Graph
Как видно из графика, "Tracker" и "tracker_second" - branchpythonoperators . Я передаю им список задач, которые необходимо выполнять через XCOM (это делается через один и тот же xcom!): < /P>

Код: Выделить всё

tracker = BranchPythonOperator(
task_id="tracker",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_list }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
tracker_second = BranchPythonOperator(
task_id="tracker_second",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_list }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
Мне нужно выполнить определенные задачи, имена которых проходят через переменную xcom manual_run_task_list . Более того, задачи из первой и второй групп не должны перекрываться. Я прошел следующие параметры: < /p>

Код: Выделить всё

{
"manual_run_task_list":[
0:"task_1"
1:"task_22"
2:"task_33"
3:"task_444"
4:"task_555"
5:"task_666"
]
}
Три задачи из первой группы и три задачи из второй группы должны выполняться, но на практике результат отличается от результата:
result
Во время исследования xcom, я обнаружил, что "tracher_second"

Код: Выделить всё

"tracker":
return_value: ['task_1', 'task_22', 'task_33', 'task_444', 'task_555', 'task_666']
skipmixin_key:  {'followed': ['task_22', 'task_33', 'task_1']}

"tracker_second":
return_value: ['task_1', 'task_22', 'task_33', 'task_444', 'task_555', 'task_666']
skipmixin_key:  {'followed': ['task_333', 'task_444', 'task_555', 'task_666', 'task_111', 'task_222']}
Он выполнял все задачи, а не только конкретные, такие как «Tracker» Did.
Не могли бы вы помочь мне понять причину такого поведения? manual_run_task_list для каждой группы задач вместо одной для всех задач. То есть, для «Tracker», у вас будет один список задач из первой группы, и для «tracker_second» у вас будет второй список. Пример: < /p>

Код: Выделить всё

tracker = BranchPythonOperator(
task_id="tracker",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_FIRST_GROUP }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
tracker_second = BranchPythonOperator(
task_id="tracker_second",
python_callable=get_task_for_update,
op_kwargs={"manual_run_task_list": "{{ params.manual_run_task_SECOND_GROUP }}"},
trigger_rule=TriggerRule.NONE_FAILED,
)
Тем не менее, это, похоже, усложняет структуру трубопровода, поэтому я хотел бы избежать этого.

Подробнее здесь: https://stackoverflow.com/questions/794 ... owing-ways
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    36 Просмотры
    Последнее сообщение Anonymous
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    22 Просмотры
    Последнее сообщение Anonymous
  • Скопируйте проект Airflow в каталог Airflow DAG.
    Anonymous » » в форуме Python
    0 Ответы
    28 Просмотры
    Последнее сообщение Anonymous
  • В воздушном потоке 2.10 Могу ли я использовать динамическое отображение задач с BranchPythonoperator?
    Anonymous » » в форуме Python
    0 Ответы
    2 Просмотры
    Последнее сообщение Anonymous
  • Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»