Код: Выделить всё
from airflow import DAG
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from datetime import datetime
import polars as pl
@dag(
dag_id="data_to_taskgroup_dag",
start_date=days_ago(2),
schedule_interval=None,
)
def data_to_taskgroup_dag():
@task
def produce_file_path():
file_path = "path/to/your/dataframe.csv"
return file_path
@task
def process_dataframe(file_path):
df = pl.read_csv(file_path)
for _, row in df.iter_rows():
email_subject = f"Email for {row['column_name']}"
email_body = f"Email body: {row['another_column']}"
EmailOperator(
task_id=f"send_email_{row['column_name']}",
to='recipient@example.com',
subject=email_subject,
html_content=email_body
).execute(context=kwargs)
file_path = produce_file_path()
process_dataframe(file_path)
Подробнее здесь: https://stackoverflow.com/questions/792 ... d-practice
Мобильная версия