Как запускать DAG в Airflow каждый раз, когда состояние внешнего события равно True (запуск на основе событий)Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как запускать DAG в Airflow каждый раз, когда состояние внешнего события равно True (запуск на основе событий)

Сообщение Anonymous »

Базовая концепция Airflow не позволяет запускать даг через нерегулярный интервал.
На самом деле я хочу запускать даг каждый раз, когда новый файл размещается на удаленном сервере (например, HTTPS, sftp, s3. ..)
Но для Airflow требуется определенный data_interval. Используя, например. HttpSensor срабатывает только один раз в течение запланированного временного окна. В моем текущем примере я использую Redis для сохранения текущего состояния файла.

Код: Выделить всё

""" DAG for operational District heating """
import json
from datetime import datetime

import redis
import requests
from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator
from airflow.providers.http.sensors.http import HttpSensor

def check_up_for_new_file(
response: requests.models.Response,
) -> bool:
""" uses redis to check if a new file is on the server"""
current_header = {
key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
for key, value in response.headers.items()
}

conn = redis.Redis(host='redis', port=6379)
recent_header = conn.hgetall("header_dict")

recent_header = {
key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value
for key, value in recent_header.items()
}

if 'Content-Length' not in recent_header.keys():
conn.hmset("header_dict", current_header)
return False

if recent_header['Content-Length'] != current_header['Content-Length']:
conn.hmset("header_dict", current_header)
return True
else:
return False

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'concurrency': 6
}

with DAG(
dag_id='check_ext',
start_date=datetime(2022, 11, 24),
tags=['test'],
catchup=False,
default_args=default_args,
) as dag:
check_for_new_file = HttpSensor(
task_id='check_up_for_new_file',
http_conn_id='_conn_id',
endpoint='',
poke_interval=20,
dag=dag,
response_check=check_up_for_new_file
)
invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
task_id='run_process_with_external_files',
function_name='LAMBDA_FUNCTION',
payload=json.dumps({"source_type": "some stuff"}),
)
check_for_new_file >> invoke_lambda_function

Как этот даг перезапускается после успеха, чтобы еще раз проверить наличие новых файлов?


Подробнее здесь: https://stackoverflow.com/questions/745 ... ue-event-b
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    37 Просмотры
    Последнее сообщение Anonymous
  • Как запустить DAG при успехе другого DAG в Airflow с использованием Python?
    Anonymous » » в форуме Python
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Скопируйте проект Airflow в каталог Airflow DAG.
    Anonymous » » в форуме Python
    0 Ответы
    29 Просмотры
    Последнее сообщение Anonymous
  • Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
    Anonymous » » в форуме Python
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Запустите DAG на основе завершения DAG вверх по течению (незапланированный)
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»