Что-то отсутствует или что-то добавлено, чтобы остановить его? от работы и запуска темы SNS? Это код Python, который я набрал, который должен запускать тему SNS при сбое DAG:
Код: Выделить всё
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from pathlib import Path
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
import boto3
from airflow.providers.amazon.aws.hooks.sns import SnsHook
DAG_NAME = 'dag_name'
# Funciton which triggers SNS notification on failure
def failure_callback(context):
sns_notification = SnsPublishOperator(
sns_hook = SnsHook(sns_topic_arn='My Topic ARN'),
dag_id = context["dag"].dag_id,
task_id = context["task_instance"].task_id,
exception = str(context.get("exception")),
log_url = context["task_instance"].log_url
)
# Trigger SNS notification
sns_client = boto3.client('sns', region_name='eu-west-2')
sns_client.publish(
TopicArn='My Topic ARN',
Message=f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}"
)
# Function to intentionally fail DAG (ONLY FOR TESTING)
def fail_dag():
raise Exception("Intentional DAG failure for test")
# Define DAG
default_args = {
"on_failure_callback": failure_callback,
'owner': 'Me',
'start_date': datetime(2021, 12, 9),
'is_prod': False
}
with DAG(
dag_id='dag_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
# To fail task intentionally
fail_task = PythonOperator(
task_id='fail_task',
python_callable=fail_dag,
)
# Dummy start and end tasks for DAG
start_task = DummyOperator(task_id='start')
end_task = DummyOperator(task_id='end')
# Dependencies (chained)
start_task >> fail_task >> end_task
globals()[DAG_NAME] = dag
Подробнее здесь: https://stackoverflow.com/questions/791 ... ke-a-lambd