Условная логика Airflow Task_group всегда входит в блок else, даже если env имеет значение «PROD» или «STAGE».Python

Программы на Python
Ответить
Anonymous
 Условная логика Airflow Task_group всегда входит в блок else, даже если env имеет значение «PROD» или «STAGE».

Сообщение Anonymous »

Проблема
У меня есть группа обеспечения доступности баз данных Airflow, которая использует @task_group с условной логикой для установки зависимостей задач на основе переменной среды. Цель:
  • В PROD или STAGE: запускать только table_task (без зависимости от восходящего потока)
  • Во всех остальных средах: запускать census_task >> table_task (последовательно)
Однако блок else всегда выполняется, даже если я подтвердил, что для переменной ENV установлено значение «PROD» или «STAGE».

CODE

Код: Выделить всё

from airflow.models import Variable

env = Variable.get("ENV")

# ... inside DAG context ...

@task_group()
def human_capital_tasks():

census_task = TriggerDagRunOperator(
task_id="term_census",
trigger_dag_id="term_census",
wait_for_completion=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)

table_task = TriggerDagRunOperator(
task_id="supervisor_table",
trigger_dag_id="supervisor_table",
wait_for_completion=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)

if env.strip().upper() in ["PROD", "STAGE"]:
table_task
else:
census_task >> table_task
Что я заметил
  • Переменная Airflow ENV имеет значение PROD (проверяется через пользовательский интерфейс Airflow и интерфейс командной строки).
  • Несмотря на это, на графике DAG всегда отображается census_task >> table_task (ветвь else).
  • Я добавил ведение журнала для печати env и подтвердил, что он возвращает «PROD»..
Что я пробовал
  • Проверил значение Variable.get("ENV"), распечатав его — он возвращает "PROD".
  • Проверено на наличие проблем с начальными/конечными пробелами или регистром (отсюда и .strip().upper()).
  • Перезапустил планировщик Airflow и веб-сервер после изменения переменной.
Мое понимание/подозрение
Я понимаю, что env = Variable.get("ENV") оценивается во время время анализа DAG, а if/else внутри @task_group также оценивается во время синтаксического анализа (он не откладывается на время выполнения). Таким образом, условное выражение должно работать. Но что-то заставляет его всегда переходить к else.
Вопросы
  • Есть ли что-то в том, как оценивает @task_group, что может привести к тому, что значение Variable.get() будет отличаться от ожидаемого во время анализа?
  • Может ли файл DAG анализироваться в контексте, где Переменная еще не доступна или возвращает значение по умолчанию/пустое значение?
  • Есть ли лучший шаблон для условного определения зависимостей задач на основе среды в Airflow 3.x+?
Среда
  • Версия Apache Airflow: (3)
  • Версия Python: (введите, например, 3.10)
  • Развертывание: ( Docker Compose, Kubernetes)


Подробнее здесь: https://stackoverflow.com/questions/799 ... hen-env-is
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»