Попытка запустить тему AWS SNS из-за сбоя Airflow DAG для вызова функции Lambda, которая создает билет JIRA для неудачноPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Попытка запустить тему AWS SNS из-за сбоя Airflow DAG для вызова функции Lambda, которая создает билет JIRA для неудачно

Сообщение Anonymous »

Я создал тестовый даг, который не работает при запуске тестирования, но я не могу активировать тему SNS после сбоя дага.
Что-то отсутствует или что-то добавлено, чтобы остановить его? от работы и запуска темы SNS? Это код Python, который я набрал, который должен запускать тему SNS при сбое DAG:

Код: Выделить всё

from datetime import datetime
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from pathlib import Path
from airflow.providers.amazon.aws.operators.sns import SnsPublishOperator
import boto3
from airflow.providers.amazon.aws.hooks.sns import SnsHook

DAG_NAME = 'dag_name'

# Funciton which triggers SNS notification on failure
def failure_callback(context):
sns_notification = SnsPublishOperator(
sns_hook = SnsHook(sns_topic_arn='My Topic ARN'),
dag_id = context["dag"].dag_id,
task_id = context["task_instance"].task_id,
exception = str(context.get("exception")),
log_url = context["task_instance"].log_url
)

# Trigger SNS notification
sns_client = boto3.client('sns', region_name='eu-west-2')
sns_client.publish(
TopicArn='My Topic ARN',
Message=f"DAG {context['dag'].dag_id} failed on task {context['task_instance'].task_id}"
)

# Function to intentionally fail DAG (ONLY FOR TESTING)
def fail_dag():
raise Exception("Intentional DAG failure for test")

# Define DAG
default_args = {
"on_failure_callback": failure_callback,
'owner': 'Me',
'start_date': datetime(2021, 12, 9),
'is_prod': False
}

with DAG(
dag_id='dag_name',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:

# To fail task intentionally
fail_task = PythonOperator(
task_id='fail_task',
python_callable=fail_dag,
)

# Dummy start and end tasks for DAG
start_task = DummyOperator(task_id='start')
end_task = DummyOperator(task_id='end')

# Dependencies (chained)
start_task >> fail_task >> end_task

globals()[DAG_NAME] = dag
Если мне что-то не хватает за пределами кода Python, предложите это, и я рассмотрю это. Тот, кто поможет мне разобраться в этом, будет вознагражден, поскольку эта задача сейчас является источником всего моего стресса.

Подробнее здесь: https://stackoverflow.com/questions/791 ... ke-a-lambd
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»