Как добавить приоритетную задачу в очередь сельдерея без отключенияPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как добавить приоритетную задачу в очередь сельдерея без отключения

Сообщение Anonymous »

У меня есть рабочий процесс сельдерея с параллелизмом, установленным на 1, который принимает задачи из RabbitMQ. Я хочу создать систему только с одной очередью в единой настройке параллелизма, чтобы все задачи были добавлены в основную очередь.
О задаче - это просто цикл, в котором мы обновить состояние с помощью Task.update_state().

Код: Выделить всё

@c_app.task(bind=True)
def task(self):
n = 20
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
print('working')
time.sleep(1)
return n
Параллельно у меня есть два сервиса.
  • Celery-beat сервис, который создает 1000 задач (сумма для примера).
  • Сервис FastAPI, предоставляющий две конечные точки:

    создать задачу с приоритетом TOP и добавить ее в основную очередь.
  • получить актуальную информацию об активной задаче и запланированных задачах (с помощью проверки())

Итак, в FastAPI можно спросить о:
  • текущая активная задача — Inspect().active()
    сколько задач осталось — Inspect().scheduled()
В этой части нет ничего интересного, поэтому без кода.
Вопрос: Как можно Я добавляю в очередь задачу с более высоким приоритетом, которая уже запланировала задачи для работника?
Вот что я пробовал:
Конфигурация сельдерея:

Код: Выделить всё

from celery.schedules import crontab
from kombu import Queue, Exchange

broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:root@localhost/celery_test_db'

worker_concurrency = 1
timezone = 'Europe/Moscow'
enable_utc = False
result_extended = True

beat_schedule = {
'add-5-tasks-every-month': {
'task': 'celery_app.tasks.add_5_tasks',
'options': {'queue': 'celery_q'},
'schedule': 20.0
},
}

broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
)
Вот моя задача Celery-Beat для добавления большого количества задач с низким приоритетом:

Код: Выделить всё

@c_app.task
def add_5_tasks():
for _ in range(800):
task.apply_async(countdown=1, queue='celery_q', priority=1)
Вот моя конечная точка FastAPI для добавления задачи с высоким приоритетом, которая, как я ожидаю, должна выполняться сразу после завершения текущей задачи.

Код: Выделить всё

@f_app.post("/add-task/")
def add_task():
task_ = task.apply_async(priority=9, countdown=1, queue='celery_q')
print('Task added with high priority:', task_.id)
return {'task_id': task_.id,
'message': 'Task added with high priority'}
И «ядро» конечной точки «current_progress», которая возвращает текущий прогресс и запланированные задачи:

Код: Выделить всё

i = c_app.control.inspect()
scheduled = i.scheduled()
reserved = i.reserved()
active = i.active()
Проблема: расстановка приоритетов работает не так, как я ожидал.

Это работает, только если я добавляю эти настройки в конфигурацию :

Код: Выделить всё

worker_prefetch_multiplier = 1
task_acks_late = True
Однако это приводит к тому, что replace().scheduled() становится абсолютно бесполезным, поскольку мы извлекаем только одну задачу подряд, поэтому работник думает, что у нас есть только одна задача в графике. Таким образом, вместо списка задач мы видим одну задачу в Inspect().scheduled()
ГЛАВНЫЙ ВОПРОС: Как включить расстановку приоритетов и получить всю информацию о запланированных задачах из `inspect().scheduled()?

Подробнее здесь: https://stackoverflow.com/questions/788 ... -disabling
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Ошибка при отправке потока строк в приоритетную очередь
    Гость » » в форуме C++
    0 Ответы
    11 Просмотры
    Последнее сообщение Гость
  • Обрезать приоритетную очередь NGenerics
    Anonymous » » в форуме C#
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Как сделать более компактной приоритетную очередь?
    Anonymous » » в форуме Python
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Как сделать более компактной приоритетную очередь?
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Как сделать более компактной приоритетную очередь?
    Anonymous » » в форуме Python
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»