Воздушный поток — остановка DAG в зависимости от состояния (пропустить оставшиеся задачи после ветвления)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Воздушный поток — остановка DAG в зависимости от состояния (пропустить оставшиеся задачи после ветвления)

Сообщение Anonymous »

Я новичок в airflow, поэтому у меня есть сомнения.
Я хочу запустить DAG, если условие первой задачи выполнено. Если условие не удовлетворено, я хочу остановить обработку данных после первой задачи.
Пример:

Код: Выделить всё

# first task
def get_number_func(**kwargs):

number = randint(0, 10)
print(number)

if (number >= 5):
print('A')
return 'continue_task'
else:
#STOP DAG

# second task if number is higher or equal 5
def continue_func(**kwargs):
print("The number is " + str(number))

# first task declaration
start_op = BranchPythonOperator(
task_id='get_number',
provide_context=True,
python_callable=get_number_func,
op_kwargs={},
dag=DAG,
)

# second task declaration
continue_op = PythonOperator(
task_id='continue_task',
provide_context=True,
python_callable=continue_func,
op_kwargs={},
dag=DAG,
)

start_op  >> continue_op

Я запускаю вторую задачу только в том случае, если условие числа удовлетворено. Если условие не проверено, группа обеспечения доступности баз данных не должна запускать вторую задачу.
Как это сделать? Я не хочу использовать xcom, глобальные переменные или фиктивную задачу.
Заранее спасибо!

Подробнее здесь: https://stackoverflow.com/questions/673 ... ter-branch
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»