Воздушный поток on_success_callbackPython

Программы на Python
Ответить
Anonymous
 Воздушный поток on_success_callback

Сообщение Anonymous »

Я пытаюсь выполнить программу, которая отправляет уведомление по электронной почте об успехе, используя on_success_callback. Я добавил следующую функцию в начало скрипта. Задание успешно завершается в Airflow, но уведомление об успехе по электронной почте не получено.Что-то не хватает в функции?

Код: Выделить всё

from airflow.utils.email import send_email

def email_success(context):
subject = f"Job {context['task_instance_key_str']} Completed"
html_content = f"""
The task {context['task_instance'].task_id}[/b][b]    in DAG {context['dag'].dag_id}[/b] completed successfully.
"""
send_email(to=['xyz@xyz.com'], subject=subject, html_content=html_content)

default_args = {
'owner': 'ABC',
'depends_on_past': False,
'email': ['xyz@xyz.com'],
'email_on_failure': True,
'on_success_callback': email_success,
'start_date': days_ago(1),
'schedule': '@daily'
}
Следующий тестовый обратный вызов также не отправляет уведомление по электронной почте. Есть ли что-то неправильное?

Код: Выделить всё

from airflow import DAG
from datetime import datetime, timedelta
from airflow.utils.email import send_email
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def success_email(context):
task_instance = context['task_instance']
task_status = 'Success'
subject = f'Airflow Task {task_instance.task_id} {task_status}'
body = f'The task {task_instance.task_id} completed with status : {task_status}. \n\n'\
f'The task execution date is: {context["execution_date"]}\n'\
f'Log url: {task_instance.log_url}\n\n'
to_email = 'xyz@xyz.com'
send_email(to = to_email, subject = subject, html_content = body)

def failure_email(context):
task_instance = context['task_instance']
task_status = 'Failed'
subject = f'Airflow Task {task_instance.task_id} {task_status}'
body = f'The task {task_instance.task_id} completed with status : {task_status}. \n\n'\
f'The task execution date is: {context["execution_date"]}\n'\
f'Log url: {task_instance.log_url}\n\n'
to_email = 'xyz@xyz.com'
send_email(to = to_email, subject = subject, html_content = body)

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 3, 17),
'schedule_interval' : 'None',
'email_on_failure': True,
'email_on_success': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5)
}

dag = DAG(
'dag_id',
default_args = default_args,
description = 'description of your dag',
schedule_interval = None,
catchup = False,
)

def python_command1():
print("How are you?!")

task1 = PythonOperator(
task_id = 'execute_python_command',
python_callable = python_command1,
on_success_callback = success_email,
on_failure_callback = failure_email,
provide_context = True,
dag = dag
)

task2 = BashOperator(
task_id = 'execute_bash_command',
bash_command = 'echo "Hello, world!"',
on_success_callback = lambda context: success_email(context),
on_failure_callback = lambda context: failure_email(context),
dag = dag
)

task1 >> task2



Подробнее здесь: https://stackoverflow.com/questions/793 ... s-callback
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»