Я пишу DAG на воздушном потоке со следующей структурой: < /p>
Выполнить простой выбор (*) в базе данных Starburst с SQLEXECUTEQUEREOPERATO /> Это DAG: < /p>
import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime
def catch_output(ti, **kwargs):
query_result=ti.xcom_pull(task_ids="query_task")
logging.info(f"{query_result}")
with DAG(
'dag_name',
schedule=None,
start_date=pendulum.datetime(2025,2,25, tz='UTC'),
max_active_runs=1,
catchup=False,
default_args={...}
) as dag:
query_task = SQLExecuteQueryOperator(
task_id='query_task',
conn_id='xyz',
show_return_value_in_logs=True,
split_statement=True,
sql='SELECT COUNT(*) FROM TABLE',
return_last=True,
do_xcom_push=True
)
catch_output = PythonOperator(
task_id='catch_output',
python_callable=catch_output,
dag=dag
)
query_task >> catch_output
< /code>
Первая задача правильно работает, а вывод регистрируется в воздушном потоке. Но вторая задача, несмотря на его завершение с успехом, ничего не регистрирует. Переменная Query_Result не имеет значения нет.
Я думаю, что причина в том, что я ничто не подтолкнуло к XCOM по задаче, которая запускает запрос на Starburst, но я не знаю, как решить эту проблему.>
Подробнее здесь: https://stackoverflow.com/questions/794 ... ryoperator
Как получить выходной сигнал SQL -запроса, выполненного с помощью SQLEXECUTEQUERYOPERATOR в воздушном потоке ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как прочитать вывод расширенного SQLEXECUTEQUERYOPERATOR в воздушном потоке?
Anonymous » » в форуме Python - 0 Ответы
- 1 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Вернуть результат рекурсивно выполненного запроса с использованием codeigniter
Anonymous » » в форуме Php - 0 Ответы
- 9 Просмотры
-
Последнее сообщение Anonymous
-