Airflow DAG застрял при фильтрации DataFrame Polars DataFramePython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Airflow DAG застрял при фильтрации DataFrame Polars DataFrame

Сообщение Anonymous »

Я динамически генерирую даги воздушного потока на основе данных из DataFrame Polars. Определение DAG включает в себя фильтрацию этого фрейма Data Frame при создании DAG и снова внутри задачи, когда DAG работает. Сгенерированный DAG, задача застряла на неопределенный срок после печати перед фильтром , не выдвигая ошибки. Просто застрял и работает навсегда, пока исключение воздушного потока не будет выпущено на использование памяти. p>
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
import polars as pl

def dag_constructor(name):
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
}

# Define the DAG
dag = DAG(
dag_id=f'{name}',
default_args=default_args,
description='A simple DAG to print Hello World',
schedule_interval='@daily',
catchup=False,
)

def print_hello():
print("starting")

df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})

print(df)
print("before filter")
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
print("after filter")
print(chains)
print("finish dag")

hello_task = PythonOperator(
task_id='print_hello',
python_callable=print_hello,
dag=dag,
)

hello_task

return dag

df = pl.DataFrame({
"key": ["A", "B", "A"],
"branch": ["br1", "ooo", "br2"],
"chain": ["ch1", "Y", "ch2"]
})
chains = df.filter(pl.col("key") == "A").select("chain").to_series().to_list()
## chains = ["ch1", "ch2"] THIS WOULD WORK, AND WONT GET STUCK, if uncommenting and commenting previous line
for ch in chains:
dag_my_id = f"aa__{str(ch)}"
globals()[dag_my_id] = dag_constructor("aa__"+ch)



Подробнее здесь: https://stackoverflow.com/questions/794 ... -dataframe
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    37 Просмотры
    Последнее сообщение Anonymous
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Скопируйте проект Airflow в каталог Airflow DAG.
    Anonymous » » в форуме Python
    0 Ответы
    29 Просмотры
    Последнее сообщение Anonymous
  • Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
    Anonymous » » в форуме Python
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»