Выполнение SQL-запросов из задачи, а не из SqlExecuteQueryOperator.Python

Программы на Python
Ответить
Anonymous
 Выполнение SQL-запросов из задачи, а не из SqlExecuteQueryOperator.

Сообщение Anonymous »

У меня возникли проблемы с написанием DAG Apache Airflow (v. 3.1.7) для следующего конвейера:
  • Извлечение строк из базы данных MS SQL на основе data_interval
  • Выходные данные из (1) сортируются и обрабатываются в функции Python.
  • Для каждой строки из (2), необходимо выполнить некоторые дополнительные SQL-запросы и добавить данные.
  • Отправить данные.
Проблема начинается с сильного смешивания «Операторов» и «задач», когда последние получают входные данные от первых и наоборот.
Вот макет:

Код: Выделить всё

import logging
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

# The DAG object; we'll need this to instantiate a DAG
from airflow.sdk import dag, task, chain
from airflow.providers.standard.operators.python import PythonOperator, BranchPythonOperator, ShortCircuitOperator
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.timetables.interval import CronDataIntervalTimetable

def output_processor_as_dict(results, descriptions):
columns = [x[0] for x in descriptions[0]]
return [ [dict(zip(columns, row)) for row in results[0]] ]

@dag(
"troublesome_dag",
default_args={
"depends_on_past": False,
"retries": 3,
"retry_delay": timedelta(minutes=1)
},
schedule=CronDataIntervalTimetable("*/10 * * * *", timezone="Europe/Copenhagen"),
start_date=datetime(2026,1,1),
catchup=False,
tags=["db"],
)
def trouble_dag():
fetch_sql = SQLExecuteQueryOperator(
task_id = 'fetch_processes',
conn_id = 'conn_id',
show_return_value_in_logs=True,
requires_result_fetch = True,
do_xcom_push=True,
output_processor=output_processor_as_dict,
sql = r"""SELECT TOP (50) * FROM tbl;""",
)

@task(show_return_value_in_logs=True)
def make_diff_times(results: list, **kwargs):
results.sort(key=lambda x: f'{x['id']}')
previous = None
for row in results:
if previous is not None and previous['group'] == row['group']
# do something to row
return results

@task(show_return_value_in_logs=True)
def handle_row(row, **kwargs):
if row['station'] == 'update':
# query another database for more data
return row

submit = GrpcOperator(task_id='submit', ...)

rows = make_diff_times(fetch_sql.output)  ## working as expected
handle_process.expand(row=rows)           ## working as expected
handle_process >> submit                  ## ???

trouble_dag()
Я также пытался изучить ShortCircuitOperator, чтобы избежать дальнейших задач и разветвления задач, но мне не хватило примеров, поскольку они либо основаны на операторах, либо на задачах, и редко примеры показывают, как передавать данные от задачи к задаче.


Подробнее здесь: https://stackoverflow.com/questions/798 ... ryoperator
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»