Я хочу запустить DAG, если условие первой задачи выполнено. Если условие не удовлетворено, я хочу остановить обработку данных после первой задачи.
Пример:
Код: Выделить всё
# first task
def get_number_func(**kwargs):
number = randint(0, 10)
print(number)
if (number >= 5):
print('A')
return 'continue_task'
else:
#STOP DAG
# second task if number is higher or equal 5
def continue_func(**kwargs):
print("The number is " + str(number))
# first task declaration
start_op = BranchPythonOperator(
task_id='get_number',
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
dag=DAG,
)
# second task declaration
continue_op = PythonOperator(
task_id='continue_task',
provide_context=True,
python_callable=continue_func,
op_kwargs={},
dag=DAG,
)
start_op >> continue_op
Как это сделать? Я не хочу использовать xcom, глобальные переменные или фиктивную задачу.
Заранее спасибо!
Подробнее здесь: https://stackoverflow.com/questions/673 ... ter-branch