Проблемы с Celery + Redis в Flask: превышен лимит повторных попыток и ошибки подключения KombuPython

Программы на Python
Ответить
Anonymous
 Проблемы с Celery + Redis в Flask: превышен лимит повторных попыток и ошибки подключения Kombu

Сообщение Anonymous »

Я занимаюсь рефакторингом приложения Flask, которое получает видеоданные вместе с некоторыми параметрами. Я решил добавить Celery для контролируемой фоновой обработки и Redis для передачи параметров. Рабочий процесс:
  • Сохраните загруженное видео во входную папку.
  • Передайте путь к видео и дополнительные параметры задаче Celery.
Однако каждый раз, когда я пытаюсь запустить задачу, я сталкиваюсь с одной из двух ошибок:
  • Превышен лимит повторных попыток при попытке повторного подключения к серверной части хранилища результатов Celery Redis.
    • Сообщение: Приложение Celery необходимо перезапустить.
  • Другая ошибка подключения, связанная с Kombu.
Я просматривал документацию Flask по фоновым задачам с помощью Celery, но не могу понять, что происходит.
Моя среда:
  • Последняя версия Flask
  • 5.6.2 (восстановление)
  • Redis Сервер v=6.2.2 64 бита
  • Python 3.11.9
Что я пробовал:
  • Перезапуск рабочего Celery
  • Проверка работы Redis и доступен
  • Точно в соответствии с руководством по Flask Celery
Вопрос:

Что может быть причиной того, что Celery не может подключиться к Redis с этими ошибками? Существуют ли какие-либо общие проблемы с конфигурацией или шаблоны Flask + Celery + Redis, которые могут привести к такому поведению?
#app/extensions.py

def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app

#app/__init__.py

def create_app() -> Flask:
app = Flask(__name__)
CORS(app)

app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost:6379",
result_backend="redis://localhost:6379",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()

celery_init_app(app)

from .routes import bp as api
app.register_blueprint(api)

return app

# app/routes.py

task = cut_point_task.delay(args)
print("celery started")
print(task.id

# app/tasks.py

@shared_task(base=AbortableTask, abortable=True, ignore_result=False)
def cut_point_task(args):
print("Celery Task Started")
time.sleep(30) # this should be a method that will use those args
print("Celery Task Finished")
return 0

#run.py

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

if __name__ == "__main__":
flask_app.run(host="0.0.0.0", port=5000, debug=True)

сельдерей работает
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/

...

[tasks]
. app.tasks.cut_point_task



Подробнее здесь: https://stackoverflow.com/questions/798 ... nnection-e
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»