Я динамически генерирую даги воздушного потока на основе данных из DataFrame Polars. Определение DAG включает в себя фильтрацию этого фрейма Data Frame при создании DAG и снова внутри задачи, когда DAG работает. Сгенерированный DAG, задача застряла на неопределенный срок после печати перед фильтром , не выдвигая ошибки. Просто застрял и работает навсегда, пока исключение воздушного потока не будет выпущено на использование памяти. p>
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl
def dag_constructor(name):
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}
# Define the DAG
dag = DAG(
dag_id=f'{name}',
default_args=default_args,
description='A simple DAG to print Hello World',
schedule_interval='@daily',
catchup=False,
)
def print_hello():
print("starting")
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
print(df)
print("before filter")
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
print("after filter")
print(chains)
print("finish dag")
hello_task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)
hello_task
return dag
df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
## chains = ["ch1", "ch2"] THIS WOULD WORK, AND WONT GET STUCK, if uncommenting and commenting previous line
for ch in chains:
dag_my_id = f"aa__{str(ch)}"
globals()[dag_my_id] = dag_constructor("aa__"+ch)
Подробнее здесь: https://stackoverflow.com/questions/794 ... -dataframe
Airflow DAG застрял при фильтрации DataFrame Polars DataFrame ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
Anonymous » » в форуме Python - 0 Ответы
- 37 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
Anonymous » » в форуме Python - 0 Ответы
- 23 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
Anonymous » » в форуме Python - 0 Ответы
- 17 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
Anonymous » » в форуме Python - 0 Ответы
- 16 Просмотры
-
Последнее сообщение Anonymous
-