import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import dag, task, chain
from airflow.providers.standard.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.timetables.interval import CronDataIntervalTimetable
def output_processor_as_dict(results, descriptions):
columns = [x[0] for x in descriptions[0]]
return [ [dict(zip(columns, row)) for row in results[0]] ]
@dag(
"troublesome_dag",
default_args={
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=1)
},
schedule=CronDataIntervalTimetable("*/10 * * * *", timezone="Europe/Copenhagen"),
start_date=datetime(2026,1,1),
catchup=False,
tags=["db"],
)
def trouble_dag():
fetch_sql = SQLExecuteQueryOperator(
task_id = 'fetch_processes',
conn_id = 'conn_id',
show_return_value_in_logs=True,
requires_result_fetch = True,
do_xcom_push=True,
output_processor=output_processor_as_dict,
sql = r"""SELECT TOP (50) * FROM tbl;""",
)
@task(show_return_value_in_logs=True)
def make_diff_times(results: list, **kwargs):
results.sort(key=lambda x: f'{x['id']}')
previous = None
for row in results:
if previous is not None and previous['group'] == row['group']
# do something to row
return results
@task(show_return_value_in_logs=True)
def handle_row(row, **kwargs):
if row['station'] == 'update':
# query another database for more data
return row
submit = GrpcOperator(task_id='submit', ...)
rows = make_diff_times(fetch_sql.output) ## working as expected
handle_process.expand(row=rows) ## working as expected
handle_process >> submit ## ???
trouble_dag()
Я также пытался изучить ShortCircuitOperator, чтобы избежать дальнейших задач и разветвления задач, но мне не хватило примеров, поскольку они либо основаны на операторах, либо на задачах, и редко примеры показывают, как передавать данные от задачи к задаче.
У меня возникли проблемы с написанием DAG Apache Airflow (v. 3.1.7) для следующего конвейера: [list] [*]Извлечение строк из базы данных MS SQL на основе data_interval
[*]Выходные данные из (1) сортируются и обрабатываются в функции Python.
[*]Для каждой строки из (2), необходимо выполнить некоторые дополнительные SQL-запросы и добавить данные.
[*]Отправить данные.
[/list] Проблема начинается с сильного смешивания «Операторов» и «задач», когда последние получают входные данные от первых и наоборот. Вот макет: [code]import logging from datetime import datetime, timedelta from zoneinfo import ZoneInfo
# The DAG object; we'll need this to instantiate a DAG from airflow.sdk import dag, task, chain from airflow.providers.standard.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator from airflow.timetables.interval import CronDataIntervalTimetable
def output_processor_as_dict(results, descriptions): columns = [x[0] for x in descriptions[0]] return [ [dict(zip(columns, row)) for row in results[0]] ]
@task(show_return_value_in_logs=True) def make_diff_times(results: list, **kwargs): results.sort(key=lambda x: f'{x['id']}') previous = None for row in results: if previous is not None and previous['group'] == row['group'] # do something to row return results
@task(show_return_value_in_logs=True) def handle_row(row, **kwargs): if row['station'] == 'update': # query another database for more data return row
submit = GrpcOperator(task_id='submit', ...)
rows = make_diff_times(fetch_sql.output) ## working as expected handle_process.expand(row=rows) ## working as expected handle_process >> submit ## ???
trouble_dag() [/code] Я также пытался изучить ShortCircuitOperator, чтобы избежать дальнейших задач и разветвления задач, но мне не хватило примеров, поскольку они либо основаны на операторах, либо на задачах, и редко примеры показывают, как передавать данные от задачи к задаче.