Воздушный поток вложенных операторов, хорошая практика?Python

Программы на Python
Ответить
Anonymous
 Воздушный поток вложенных операторов, хорошая практика?

Сообщение Anonymous »

Моя цель — передать строку пути к файлу из одного @task в Emailoperators, чтобы я мог применить логику к набору данных, который я буду читать из пути к файлу, для создания своих операторов, которые будут отправлять электронные письма. Мой код выглядит так:

Код: Выделить всё

from airflow import DAG
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import datetime

import polars as pl

@dag(
dag_id="data_to_taskgroup_dag",
start_date=days_ago(2),
schedule_interval=None,
)
def data_to_taskgroup_dag():
@task
def produce_file_path():
file_path = "path/to/your/dataframe.csv"
return file_path

@task
def process_dataframe(file_path):
df = pl.read_csv(file_path)

for _, row in df.iter_rows():
email_subject = f"Email for {row['column_name']}"
email_body = f"Email body: {row['another_column']}"

EmailOperator(
task_id=f"send_email_{row['column_name']}",
to='recipient@example.com',
subject=email_subject,
html_content=email_body
).execute(context=kwargs)

file_path = produce_file_path()
process_dataframe(file_path)
Проблема этого подхода заключается в том, что я вставляю операторы электронной почты в задачуprocess_dataframe, поэтому я не могу видеть сообщения электронной почты в пользовательском интерфейсе, вложение в основном делается для того, чтобы иметь возможность получить xcom из задачи вышестоящего уровня, если я не украслюprocess_dataframe и оставлю как простую функцию, эта строка завершится неудачей, df = pl.read_csv(file_path), потому что теперь file_path — это не строка, а Plainxcomarg. Каков подход/практика для такого рода ситуаций?

Подробнее здесь: https://stackoverflow.com/questions/792 ... d-practice
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»