Anonymous
Попытка запустить тему AWS SNS из-за сбоя Airflow DAG для вызова функции Lambda, которая создает билет JIRA для неудачно
Сообщение
Anonymous » 08 ноя 2024, 17:54
Я создал тестовый даг, который не работает при запуске тестирования, но я не могу активировать тему SNS после сбоя дага.
Что-то отсутствует или что-то добавлено, чтобы остановить его? от работы и запуска темы SNS? Это код Python, который я набрал, который должен запускать тему SNS при сбое DAG:
Код: Выделить всё
from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from pathlib import Path
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
import boto3
from airflow.providers.amazon.aws.hooks.sns import SnsHook
DAG_NAME = 'dag_name'
# Funciton which triggers SNS notification on failure
def failure_callback(context):
sns_notification = SnsPublishOperator(
sns_hook = SnsHook(sns_topic_arn='My Topic ARN'),
dag_id = context["dag"].dag_id,
task_id = context["task_instance"].task_id,
exception = str(context.get("exception")),
log_url = context["task_instance"].log_url
)
# Trigger SNS notification
sns_client = boto3.client('sns', region_name='eu-west-2')
sns_client.publish(
TopicArn='My Topic ARN',
Message=f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}"
)
# Function to intentionally fail DAG (ONLY FOR TESTING)
def fail_dag():
raise Exception("Intentional DAG failure for test")
# Define DAG
default_args = {
"on_failure_callback": failure_callback,
'owner': 'Me',
'start_date': datetime(2021, 12, 9),
'is_prod': False
}
with DAG(
dag_id='dag_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
# To fail task intentionally
fail_task = PythonOperator(
task_id='fail_task',
python_callable=fail_dag,
)
# Dummy start and end tasks for DAG
start_task = DummyOperator(task_id='start')
end_task = DummyOperator(task_id='end')
# Dependencies (chained)
start_task >> fail_task >> end_task
globals()[DAG_NAME] = dag
Может быть, мне что-то не хватает за пределами кода Python?
Подробнее здесь:
https://stackoverflow.com/questions/791 ... ke-a-lambd
1731077665
Anonymous
Я создал тестовый даг, который не работает при запуске тестирования, но я не могу активировать тему SNS после сбоя дага. Что-то отсутствует или что-то добавлено, чтобы остановить его? от работы и запуска темы SNS? Это код Python, который я набрал, который должен запускать тему SNS при сбое DAG: [code]from datetime import datetime from airflow.models import DAG from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator from pathlib import Path from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator import boto3 from airflow.providers.amazon.aws.hooks.sns import SnsHook DAG_NAME = 'dag_name' # Funciton which triggers SNS notification on failure def failure_callback(context): sns_notification = SnsPublishOperator( sns_hook = SnsHook(sns_topic_arn='My Topic ARN'), dag_id = context["dag"].dag_id, task_id = context["task_instance"].task_id, exception = str(context.get("exception")), log_url = context["task_instance"].log_url ) # Trigger SNS notification sns_client = boto3.client('sns', region_name='eu-west-2') sns_client.publish( TopicArn='My Topic ARN', Message=f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}" ) # Function to intentionally fail DAG (ONLY FOR TESTING) def fail_dag(): raise Exception("Intentional DAG failure for test") # Define DAG default_args = { "on_failure_callback": failure_callback, 'owner': 'Me', 'start_date': datetime(2021, 12, 9), 'is_prod': False } with DAG( dag_id='dag_name', default_args=default_args, schedule_interval=None, catchup=False, ) as dag: # To fail task intentionally fail_task = PythonOperator( task_id='fail_task', python_callable=fail_dag, ) # Dummy start and end tasks for DAG start_task = DummyOperator(task_id='start') end_task = DummyOperator(task_id='end') # Dependencies (chained) start_task >> fail_task >> end_task globals()[DAG_NAME] = dag [/code] Может быть, мне что-то не хватает за пределами кода Python? Подробнее здесь: [url]https://stackoverflow.com/questions/79168273/trying-to-trigger-an-aws-sns-topic-from-an-airflow-dag-failure-to-invoke-a-lambd[/url]