Как получить выходной сигнал SQL -запроса, выполненного с помощью SQLEXECUTEQUERYOPERATOR в воздушном потокеPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как получить выходной сигнал SQL -запроса, выполненного с помощью SQLEXECUTEQUERYOPERATOR в воздушном потоке

Сообщение Anonymous »

Я пишу DAG на воздушном потоке со следующей структурой: < /p>

Выполнить простой выбор (*) в базе данных Starburst с SQLEXECUTEQUEREOPERATO /> Это DAG: < /p>
import pendulum, os, logging
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import TaskInstance
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from datetime import datetime

def catch_output(ti, **kwargs):
query_result=ti.xcom_pull(task_ids="query_task")
logging.info(f"{query_result}")

with DAG(
'dag_name',
schedule=None,
start_date=pendulum.datetime(2025,2,25, tz='UTC'),
max_active_runs=1,
catchup=False,
default_args={...}
) as dag:

query_task = SQLExecuteQueryOperator(
task_id='query_task',
conn_id='xyz',
show_return_value_in_logs=True,
split_statement=True,
sql='SELECT COUNT(*) FROM TABLE',
return_last=True,
do_xcom_push=True
)

catch_output = PythonOperator(
task_id='catch_output',
python_callable=catch_output,
dag=dag
)

query_task >> catch_output

< /code>
Первая задача правильно работает, а вывод регистрируется в воздушном потоке. Но вторая задача, несмотря на его завершение с успехом, ничего не регистрирует. Переменная Query_Result не имеет значения нет.
Я думаю, что причина в том, что я ничто не подтолкнуло к XCOM по задаче, которая запускает запрос на Starburst, но я не знаю, как решить эту проблему.>

Подробнее здесь: https://stackoverflow.com/questions/794 ... ryoperator
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как прочитать вывод расширенного SQLEXECUTEQUERYOPERATOR в воздушном потоке?
    Anonymous » » в форуме Python
    0 Ответы
    1 Просмотры
    Последнее сообщение Anonymous
  • Как установить Dynamic execution_timeout для задачи в воздушном потоке с помощью GkePodoperator?
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Вернуть результат рекурсивно выполненного запроса с использованием codeigniter
    Anonymous » » в форуме Php
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Перехват исключений для выполненного sql в искре
    Anonymous » » в форуме JAVA
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Перехват исключений для выполненного sql в искре
    Anonymous » » в форуме JAVA
    0 Ответы
    13 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»