моя среда: < /strong> < /p>
- Airflow: Apache-airflow == 3.0.2 < /li>
virders: apache-airflow == 3.0.2 < /li>
. Apache-airflow-providers-celery == 3.12.0 - Исполнитель: CeleryExecutor
минимальный тестовый пример
Чтобы изолировать проблему, я удалил все остальные файлы из моего папки Dags, оставив только два простых файла:
dags/simple_task.py
Код: Выделить всё
import logging
from airflow.providers.celery.executors.celery_executor import app
log = logging.getLogger(__name__)
@app.task
def my_simple_test_task(message):
"""A minimal task that only logs a message."""
log.info("SUCCESS! The simple task ran with message: %s", message)
Код: Выделить всё
from __future__ import annotations
import pendulum
from airflow.decorators import dag, task
from simple_task import my_simple_test_task
@dag(
dag_id='minimal_celery_test',
schedule=None,
start_date=pendulum.now(),
catchup=False
)
def minimal_celery_test_dag():
@task
def trigger_the_simple_task():
my_simple_test_task.delay("Testing Celery import.")
trigger_the_simple_task()
minimal_celery_test_dag()
< /code>
[b] Стадии конфигурации и отладки < /strong>
my airflow.cfg настроен на импорт этого модуля: < /p>
Airflow.cfg
[celery]
imports = simple_task
[*] Службы жесткого сброса [/b]: полностью останавливая запланировщик воздушного потока и раздачу в воздушном потоке stronge> stronge> stronge> stronge> strong> stronge> stronge> stronge> world> deltating> stronge>
[*] и .pyc -файлы из моего проекта.
perviciping file location : Обеспечение как simple_task.py, так и test_dag.py находятся непосредственно в папке Dags, на которую ссылаются в конфигурации. trigger_the_simple_task Задача отправляет задание, но она сразу не стерла (как я вижу его на панели панели цветов) на работнике со следующей ошибкой: < /p>
Код: Выделить всё
NotRegistered('simple_task.my_simple_test_task')
Когда я проверяю журналы запуска работника сельдерея, в разделе [tasks] только указаны задачи воздушного потока по умолчанию; my_simple_test_task отсутствует, что подтверждает, что он не зарегистрирован. Есть ли какие-либо другие известные проблемы, факторы окружающей среды или конфигурации, специфичные для воздушного потока 3, которые могут привести к этому поведению?
Может быть, есть другой способ достичь такого поведения? Понимание его дизайна разъясняет, почему мне нужен этот конкретный шаблон воздушного потока для работы.
Код: Выделить всё
daily_crawler_dag.py
Задача сельдерея (
Код: Выделить всё
html_acquisition.py
Наиболее важная задача - приобрести распределенный redis lock < /strong> для каждого домена
url, прежде чем сделать запрос. Это обеспечивает соблюдение вежливости
, обеспечивая отправляется только один запрос на конкретный веб -сайт за раз. Отделяет оркестровку от трудоемкого ползания. Использование автономной задачи сельдерея, называемой через .delay () , имеет важное значение для основанного на Redis вежливости и контроля параллелизма. Вот почему разрешение ошибки нерегистрированного является критическим блокировщиком для всей системы.
Подробнее здесь: https://stackoverflow.com/questions/796 ... -celery-im