Проблема в приложении django с использованием RabbitMQ в качестве брокера и сельдерея для запуска простой задачиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Проблема в приложении django с использованием RabbitMQ в качестве брокера и сельдерея для запуска простой задачи

Сообщение Anonymous »

Я пытаюсь узнать, как настроить приложение Django для использования сельдерея. Я запускаю RabbitMQ на рабочем столе Docker на компьютере с Windows с помощью этой команды:

Код: Выделить всё

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq
затем я запускаю сельдерей с помощью этой команды и отслеживаю его с помощью цветка:

Код: Выделить всё

celery -A a_shop worker -l Info
celery -A a_shop flower
В основном файле инициализации приложения django у меня есть это:

Код: Выделить всё

from .celery import app as celery_app

__all__ = ('celery_app',)
а для настроек сельдерея в файле settings.py у меня есть следующее:

Код: Выделить всё

CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_TASK_SOFT_TIME_LIMIT = 600  # 10 minutes
CELERY_TASK_TIME_LIMIT = 1200
CELERY_IMPORTS = ('orders.task',)
CELERY_TASK_DEFAULT_QUEUE = 'celery'
Я создал файл с именем celery.py в главном приложении следующим образом:

Код: Выделить всё

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'a_shop.settings')

app = Celery('a_shop')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
print('Request: {self.request!r}')
Я создал простую задачу для проверки:

Код: Выделить всё

import logging
from celery import shared_task
from .models import Order
logger = logging.getLogger(__name__)

@shared_task
def order_created(order_id):
logger.info(f"Task order_created started with order_id: {order_id}")
try:
order = Order.objects.get(id=order_id)
logger.info(f"Processing order: {order.name}")
print(order_id)
except Exception as e:
logger.error(f"Error processing order {order_id}: {e}")
raise
logger.info(f"Task order_created completed with order_id: {order_id}")
return order_id
Я выполняю задачу в одном из моих представлений с помощью:

Код: Выделить всё

order_created.delay(new_order.id)
Я вижу полученную задачу в консоли сельдерея:

Код: Выделить всё

[2024-06-20 15:59:43,220: INFO/MainProcess] Task orders.task.order_created[dbbcafd1-55f8-4e86-ae48-1c68fda5c75f] received
[2024-06-20 15:59:44,289: INFO/SpawnPoolWorker-9] child process 10608 calling self.run()
[2024-06-20 15:59:44,291: INFO/SpawnPoolWorker-10] child process 744 calling self.run()
но через некоторое время получаю на консоли вот такую ​​ошибку:

Код: Выделить всё

[2024-06-20 05:01:48,262: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery
acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms.  This timeout value can be configured, see consumers d
oc guide to learn more', (0, 0), '')
Traceback (most recent call last):
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 628, in start
c.loop(*c.loop_args())
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\loops.py", line 130, in synloop
connection.drain_events(timeout=2.0)
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\transport\pyamqp.py", line 171, in drain_events
return connection.drain_events(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 532, in blocking_read
return self.on_inbound_frame(frame)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 538, in on_inbound_method
return self.channels[channel_id].dispatch_method(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_method
listener(*args)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\channel.py", line 293, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Time
out value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
В чем проблема и как ее исправить?

Подробнее здесь: https://stackoverflow.com/questions/786 ... ning-simpe
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»