Код: Выделить всё
@task
def metrics_aggregation_task():
try:
logging.info("Starting metrics aggregation task")
logging.info("Initializing engines")
LANGFUSE_DATABASE_URL = os.getenv("LANGFUSE_DATABASE_URL")
langfuse_engine = create_engine(LANGFUSE_DATABASE_URL)
AIRFLOW_DATABASE_URL = os.getenv("AIRFLOW_DATABASE_URL")
airflow_engine = create_engine(AIRFLOW_DATABASE_URL)
AUTOMODELLING_DATABASE_URL = os.getenv("MODELLING_DATABASE_URL")
automodelling_engine = create_engine(AUTOMODELLING_DATABASE_URL)
# Define the session
session_automodelling = Session(bind=automodelling_engine)
session_langfuse = Session(bind=langfuse_engine)
session_airflow = Session(bind=airflow_engine)
logging.info("Sessions created")
product_id_and_run_id_query = """
SELECT "productId", "id", "metadata" FROM "Dags"
"""
logging.info("Fetching product_ids_and_run_ids")
product_ids_and_run_ids = session_automodelling.execute(
product_id_and_run_id_query
).fetchall()
logging.info(f"Fetched product_ids_and_run_ids : {product_ids_and_run_ids}")
Код: Выделить всё
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:52} INFO - Starting metrics aggregation task
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:54} INFO - Initializing engines
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:69} INFO - Sessions created
[2024-03-19, 13:02:58 CET] {metrics_aggregation.py:74} INFO - Fetching product_ids_and_run_ids
Заранее спасибо за вашу помощь.
РЕДАКТИРОВАТЬ: код заключен в блок try/Exception, но никаких ошибок не возникает, это похоже на то, как если бы код ожидал выполнения операции.
И DAG, и задача работают при использовании dags airflow test или задачи airflow test. Эта проблема возникает, в частности, во время выполнения планировщика/веб-сервера воздушного потока.
Подробнее здесь: https://stackoverflow.com/questions/781 ... operations