Проблема
У меня есть группа обеспечения доступности баз данных Airflow, которая использует @task_group с условной логикой для установки зависимостей задач на основе переменной среды. Цель:
В PROD или STAGE: запускать только table_task (без зависимости от восходящего потока)
Во всех остальных средах: запускать census_task >> table_task (последовательно)
Однако блок else всегда выполняется, даже если я подтвердил, что для переменной ENV установлено значение «PROD» или «STAGE».
Переменная Airflow ENV имеет значение PROD (проверяется через пользовательский интерфейс Airflow и интерфейс командной строки).
Несмотря на это, на графике DAG всегда отображается census_task >> table_task (ветвь else).
Я добавил ведение журнала для печати env и подтвердил, что он возвращает «PROD»..
Что я пробовал
Проверил значение Variable.get("ENV"), распечатав его — он возвращает "PROD".
Проверено на наличие проблем с начальными/конечными пробелами или регистром (отсюда и .strip().upper()).
Перезапустил планировщик Airflow и веб-сервер после изменения переменной.
Мое понимание/подозрение
Я понимаю, что env = Variable.get("ENV") оценивается во время время анализа DAG, а if/else внутри @task_group также оценивается во время синтаксического анализа (он не откладывается на время выполнения). Таким образом, условное выражение должно работать. Но что-то заставляет его всегда переходить к else.
Вопросы
Есть ли что-то в том, как оценивает @task_group, что может привести к тому, что значение Variable.get() будет отличаться от ожидаемого во время анализа?
Может ли файл DAG анализироваться в контексте, где Переменная еще не доступна или возвращает значение по умолчанию/пустое значение?
Есть ли лучший шаблон для условного определения зависимостей задач на основе среды в Airflow 3.x+?
Проблема У меня есть группа обеспечения доступности баз данных Airflow, которая использует @task_group с условной логикой для установки зависимостей задач на основе переменной среды. Цель: [list] [*]В [b]PROD[/b] или [b]STAGE[/b]: запускать только table_task (без зависимости от восходящего потока)
[*]Во [b]всех остальных средах[/b]: запускать census_task >> table_task (последовательно)
[/list] Однако [b]блок else всегда выполняется[/b], даже если я подтвердил, что для переменной ENV установлено значение «PROD» или «STAGE».
if env.strip().upper() in ["PROD", "STAGE"]: table_task else: census_task >> table_task [/code] Что я заметил [list] [*]Переменная Airflow ENV имеет значение PROD (проверяется через пользовательский интерфейс Airflow и интерфейс командной строки).
[*]Несмотря на это, на графике DAG всегда отображается census_task >> table_task (ветвь else).
[*]Я добавил ведение журнала для печати env и подтвердил, что он возвращает «PROD»..
[/list] Что я пробовал [list] [*]Проверил значение Variable.get("ENV"), распечатав его — он возвращает "PROD".
[*]Проверено на наличие проблем с начальными/конечными пробелами или регистром (отсюда и .strip().upper()).
[*]Перезапустил планировщик Airflow и веб-сервер после изменения переменной.
[/list] Мое понимание/подозрение Я понимаю, что env = Variable.get("ENV") оценивается во время [b]время анализа DAG[/b], а if/else внутри @task_group также оценивается во время синтаксического анализа (он не откладывается на время выполнения). Таким образом, условное выражение должно работать. Но что-то заставляет его всегда переходить к else. Вопросы [list] [*]Есть ли что-то в том, как оценивает @task_group, что может привести к тому, что значение Variable.get() будет отличаться от ожидаемого во время анализа?
[*]Может ли файл DAG анализироваться в контексте, где Переменная еще не доступна или возвращает значение по умолчанию/пустое значение?
[*]Есть ли лучший шаблон для условного определения зависимостей задач на основе среды в Airflow 3.x+?