Как передать несколько выходных данных потока задач из одной задачи в две последующие задачи потока задач с помощью ShorPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как передать несколько выходных данных потока задач из одной задачи в две последующие задачи потока задач с помощью Shor

Сообщение Anonymous »

Я пытаюсь использовать ShortCircuitOperator в качестве контрольной точки между начальной задачей и двумя последующими задачами, которые выполняются параллельно. Я использую Airflow 2.0.2. Проблема, с которой я столкнулся, заключается в том, что ShortCircuitOperator всегда либо запускается параллельно с двумя последними задачами, либо последние 2 задачи выполняются одновременно с ShortCircuitOperator И ниже по потоку от ShortCircuitOperator. Кроме того, две последующие задачи и запуск ShortCircuitOperator с выходными данными образуют начальную задачу.
Я пытался установить зависимости различными способами в рамках итерации для двух уникальных идентификаторов< /p>

Код: Выделить всё

   for id in ids:
get_table_data = get_data(Id)
table_evaluation = ShortCircuitOperator(
task_id=f'table_evaluation_{Id}',
python_callable=evaluation,
op_args=[get_table_data["table_eval"]]
)
mapping_table_build = build_mapping_tables(get_table_data["ext"],get_table_data["name"])
survey_table_build =     update_survey_tables(get_table_data["ext"],get_table_data["name"],get_table_data["Token"])
get_table_data >> table_evaluation >> [mapping_table_build,survey_table_build]
Предоставляет мне как table_evaluation,mapping_table_build, так и Survey_table_build параллельно, а также [mapping_table_build,survey_table_build] ниже по потоку от table_evaluation.
Если я просто позволю поток задач для установки зависимостей

Код: Выделить всё

   for id in ids:
get_table_data = get_data(Id)
table_evaluation = ShortCircuitOperator(
task_id=f'table_evaluation_{Id}',
python_callable=evaluation,
op_args=[get_table_data["table_eval"]]
)
build_mapping_tables(get_table_data["ext"],get_table_data["name"])
update_survey_tables(get_table_data["ext"],get_table_data["name"],get_table_data["Token"])
тогда все три (table_evaluation,build_mapping_tables и update_survey_tables) находятся только параллельно ниже по потоку от get_table_data.
Даже если я попытаюсь передать выходные данные из table_evaluation как для build_mapping_tables, так и для update_survey_tables, я получаю задачи как параллельно, так и в нисходящем направлении

Подробнее здесь: https://stackoverflow.com/questions/791 ... k-flow-tas
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»