Производитель группы обеспечения доступности баз данных:
Код: Выделить всё
from airflow.datasets import Dataset
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.empty import EmptyOperator
MY_DATA = Dataset('bigquery://my-project-name/my-schema/-my-table')
data_set_operator = EmptyOperator(
task_id="producer",
outlets = MY_DATA
)
default_args = {
"start_date": (2024,11,20),
"depends_on_past": False,
"on_failure_callback": some_function
}
with DAG as (
dag_id = "my_dag",
max_active_runs = 1
default_args = default_dags,
schedule_interval = "30 8 * * *",
) as dag:
sql_task = SQLExecuteQueryOperator(
task_id = task_id
query = "my_query"
conn_id = "bq_conn_id"
params = my_dictionary
)
sql_task >> data_set_operator
Код: Выделить всё
from airflow.datasets import Dataset
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
MY_DATA = Dataset('bigquery://my-project-name/my-schema/-my-table')
default_args = {
"start_date": (2024,11,20),
"depends_on_past": False,
"on_failure_callback": some_function
}
with DAG as (
dag_id = "my_dag",
max_active_runs = 1
# if I comment out default args the dag works.
default_args = default_dags,
schedule = MY_DATA
) as dag:
sql_task = SQLExecuteQueryOperator(
task_id = task_id
query = "my_query2"
conn_id = "bq_conn_id"
params = my_dictionary2
)
Подробнее здесь: https://stackoverflow.com/questions/792 ... in-airflow
Мобильная версия