Как запустить задачу на определенное время?Python

Программы на Python
Ответить
Anonymous
 Как запустить задачу на определенное время?

Сообщение Anonymous »

Я создаю группу обеспечения доступности баз данных в Airflow, и внутри этой группы обеспечения доступности баз данных мне нужно запустить другую группу обеспечения доступности баз данных с помощью TimeSensor. Цель состоит в том, чтобы установить целевое время между 2:00 и 3:00 ночи, и если датчик времени срабатывает после 2:00 ночи, он должен ждать до 2:00 ночи следующего дня. Однако я столкнулся со следующей ошибкой:

Код: Выделить всё

TypeErrors not supported between instances of 'datetime time' and 'datetime datetime"
Мой код;

Код: Выделить всё

  from airflow.models import DAG
from datetime import timedelta, datetime
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.operators.python_operator import PythonOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.sensors.time_sensor import TimeSensor
from datetime import datetime, time
from airflow.utils.state import State
from airflow.models import DagRun
import pytz
import pendulum

def calculate_next_target_time():
now = pendulum.now('Europe/Istanbul')
target_time = now.replace(hour=2, minute=0, second=0, microsecond=0)
if now >= target_time:
target_time = target_time.add (days=1)
return target_time
def wait_for_time():
next_target_time = calculate_next_target_time()
print(f"Next target time: (next_target_time}")
return next_target_time

with DAG(
'_dag',
default_args={
'depends_on_past' : False,
'email': [''],
'email_on_failure': False,
'email_on_retry': False,},
schedule_interval= None,
start_date=datetime (2021, 1, 1), catchup=False) as dag:
dummy_task= SSHOperator (
task_id= "dummy_task"
command =  "dummy-task"
ssh_conn_id = "dummy",)

wait_for_2am = TimeSensor (
task_id= 'wait_for_2am',                      poke_interval=60,
timeout=3000,
mode= 'poke',
target_time=calculate_next_target_time(),
dag=dag)

wait_for_2am_task = PythonOperator (
task_id= "wait_for_2am_task"
python_callable=wait_for_time,
dag=dag)

service_trigger = TriggerDagRunOperator (
task_id= "service_trigger",
trigger_dag_id= "other_dag",)

dummy_task » wait_for_2am_task » wait_for_2am » service_trigger
как мне решить эту проблему. Датчик времени доступен только для времени (например, часов).

Подробнее здесь: https://stackoverflow.com/questions/793 ... cific-time
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»