Параметры для набора данных и планирования с учетом данных в AirflowPython

Программы на Python
Ответить
Anonymous
 Параметры для набора данных и планирования с учетом данных в Airflow

Сообщение Anonymous »

Были ли у кого-нибудь проблемы с тем, что в группе обеспечения доступности баз данных задачи не были завершены при передаче default_args в группу обеспечения доступности данных потребителя?
Производитель группы обеспечения доступности баз данных:

Код: Выделить всё

from airflow.datasets import Dataset
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.empty import EmptyOperator

MY_DATA = Dataset('bigquery://my-project-name/my-schema/-my-table')

data_set_operator = EmptyOperator(
task_id="producer",
outlets = MY_DATA
)

default_args = {
"start_date": (2024,11,20),
"depends_on_past": False,
"on_failure_callback": some_function
}

with DAG as (
dag_id = "my_dag",
max_active_runs = 1
default_args = default_dags,
schedule_interval = "30 8 * * *",
) as dag:

sql_task = SQLExecuteQueryOperator(
task_id = task_id
query = "my_query"
conn_id = "bq_conn_id"
params = my_dictionary
)

sql_task >> data_set_operator

Потребительский DAG Downstream

Код: Выделить всё

from airflow.datasets import Dataset
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

MY_DATA = Dataset('bigquery://my-project-name/my-schema/-my-table')

default_args = {
"start_date": (2024,11,20),
"depends_on_past": False,
"on_failure_callback": some_function
}

with DAG as (
dag_id = "my_dag",
max_active_runs = 1
# if I comment out default args the dag works.
default_args = default_dags,
schedule = MY_DATA
) as dag:

sql_task = SQLExecuteQueryOperator(
task_id = task_id
query = "my_query2"
conn_id = "bq_conn_id"
params = my_dictionary2
)

Когда даг производителя завершается, нисходящий запуск завершается, но фактическая задача не запускается, а задача пуста/отсутствует или не зеленеет. После того как я закомментирую/удалю параметры default_args из потребительской группы обеспечения доступности баз данных, он запускается соответствующим образом, и я вижу, что задача действительно выполняется. Можем ли мы не передавать default_args в потребительскую группу обеспечения доступности баз данных?

Подробнее здесь: https://stackoverflow.com/questions/792 ... in-airflow
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»