Воздушный поток — задача выполняется, но не заканчивается операциями SQLAlchemyPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Воздушный поток — задача выполняется, но не заканчивается операциями SQLAlchemy

Сообщение Anonymous »

Я застрял в проблеме, из-за которой моя задача не выходит за рамки выполнения запросов SQLAlchemy:

Код: Выделить всё

@task
def metrics_aggregation_task():
try:
logging.info("Starting metrics aggregation task")

logging.info("Initializing engines")
LANGFUSE_DATABASE_URL = os.getenv("LANGFUSE_DATABASE_URL")
langfuse_engine = create_engine(LANGFUSE_DATABASE_URL)

AIRFLOW_DATABASE_URL = os.getenv("AIRFLOW_DATABASE_URL")
airflow_engine = create_engine(AIRFLOW_DATABASE_URL)

AUTOMODELLING_DATABASE_URL = os.getenv("MODELLING_DATABASE_URL")
automodelling_engine = create_engine(AUTOMODELLING_DATABASE_URL)

# Define the session
session_automodelling = Session(bind=automodelling_engine)
session_langfuse = Session(bind=langfuse_engine)
session_airflow = Session(bind=airflow_engine)

logging.info("Sessions created")
product_id_and_run_id_query = """
SELECT "productId", "id", "metadata"  FROM "Dags"
"""

logging.info("Fetching product_ids_and_run_ids")
product_ids_and_run_ids = session_automodelling.execute(
product_id_and_run_id_query
).fetchall()

logging.info(f"Fetched product_ids_and_run_ids : {product_ids_and_run_ids}")
При проверке журналов задачи я вижу, что предыдущие журналы работают, что указывает на то, что задача запущена и работает:

Код: Выделить всё

[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:52} INFO - Starting metrics aggregation task
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:54} INFO - Initializing engines
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:69} INFO - Sessions created
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:74} INFO - Fetching product_ids_and_run_ids
Я не совсем понимаю, почему это происходит. Я считаю, что это связано с какой-то асинхронной операцией, происходящей с сеансом, но я не знаю, как поступить правильно.
Заранее спасибо за вашу помощь.
РЕДАКТИРОВАТЬ: код заключен в блок try/Exception, но никаких ошибок не возникает, это похоже на то, как если бы код ожидал выполнения операции.
И DAG, и задача работают при использовании dags airflow test или задачи airflow test. Эта проблема возникает, в частности, во время выполнения планировщика/веб-сервера воздушного потока.

Подробнее здесь: https://stackoverflow.com/questions/781 ... operations
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Воздушный поток Apache: имя модуля «воздушный поток» отсутствует. Задача не выполнена
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Задача выбора активности с резервными операциями
    Anonymous » » в форуме C++
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Воздушный поток: pgpy encrypt выдает NotImplementedError: ошибка PubKeyAlgorithm.RSAEncrypt
    Anonymous » » в форуме Python
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Воздушный поток запланирован с параметрами конфигурации dag
    Anonymous » » в форуме Python
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Воздушный поток — другой интервал графика обратной засыпки.
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»