Базовая концепция Airflow не позволяет запускать даг через нерегулярный интервал.
На самом деле я хочу запускать даг каждый раз, когда новый файл размещается на удаленном сервере (например, HTTPS, sftp, s3. ..)
Но для Airflow требуется определенный data_interval. Используя, например. HttpSensor срабатывает только один раз в течение запланированного временного окна. В моем текущем примере я использую Redis для сохранения текущего состояния файла.
Базовая концепция Airflow не позволяет запускать даг через нерегулярный интервал. На самом деле я хочу запускать даг каждый раз, когда новый файл размещается на удаленном сервере (например, HTTPS, sftp, s3. ..) Но для Airflow требуется определенный data_interval. Используя, например. HttpSensor срабатывает только один раз в течение запланированного временного окна. В моем текущем примере я использую Redis для сохранения текущего состояния файла. [code]""" DAG for operational District heating """ import json from datetime import datetime
import redis import requests from airflow import DAG from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator from airflow.providers.http.sensors.http import HttpSensor
def check_up_for_new_file( response: requests.models.Response, ) -> bool: """ uses redis to check if a new file is on the server""" current_header = { key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value for key, value in response.headers.items() }
recent_header = { key.decode() if isinstance(key, bytes) else key: value.decode() if isinstance(value, bytes) else value for key, value in recent_header.items() }
if 'Content-Length' not in recent_header.keys(): conn.hmset("header_dict", current_header) return False
У меня есть родительское задание DAG и дочернее задание DAG. Задачи дочернего задания должны запускаться при успешном завершении задач родительского задания, которые выполняются ежедневно. Как добавить внешний триггер задания?
У меня есть родительское задание DAG и дочернее задание DAG. Задачи дочернего задания должны запускаться при успешном завершении задач родительского задания, которые выполняются ежедневно. Как добавить внешний триггер задания?
Я пытаюсь увидеть, смогу ли я запустить DAG (скажем, B1), чтобы запустить (ежедневно), если пробег вверх DAG (скажем, A1). Я уже знаю о ExternalTaskSensor, но мое ограничение состоит в том, что вверх по течению DAG U1 не распланирован, то есть...
Я пытаюсь увидеть, смогу ли я запустить DAG (скажем, B1), чтобы запустить (ежедневно), если пробег вверх по течению (скажем, A1). Я уже знаю о ExternalTaskSensor, но мое ограничение состоит в том, что вверх по течению DAG U1 не расписан, то есть...