О задаче - это просто цикл, в котором мы обновить состояние с помощью Task.update_state().
Код: Выделить всё
@c_app.task(bind=True)
def task(self):
n = 20
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
print('working')
time.sleep(1)
return n
- Celery-beat сервис, который создает 1000 задач (сумма для примера).
- Сервис FastAPI, предоставляющий две конечные точки:
создать задачу с приоритетом TOP и добавить ее в основную очередь. - получить актуальную информацию об активной задаче и запланированных задачах (с помощью проверки())
Итак, в FastAPI можно спросить о:
- текущая активная задача — Inspect().active()
сколько задач осталось — Inspect().scheduled()
Вопрос: Как можно Я добавляю в очередь задачу с более высоким приоритетом, которая уже запланировала задачи для работника?
Вот что я пробовал:
Конфигурация сельдерея:
Код: Выделить всё
from celery.schedules import crontab
from kombu import Queue, Exchange
broker_url = 'amqp://guest:guest@localhost//'
result_backend = 'db+postgresql://admin:root@localhost/celery_test_db'
worker_concurrency = 1
timezone = 'Europe/Moscow'
enable_utc = False
result_extended = True
beat_schedule = {
'add-5-tasks-every-month': {
'task': 'celery_app.tasks.add_5_tasks',
'options': {'queue': 'celery_q'},
'schedule': 20.0
},
}
broker_transport_options = {'queue_order_strategy': 'priority'}
task_queues = (
Queue("celery_q", Exchange("celery_q"), routing_key="celery_q", queue_arguments={'x-max-priority': 9}),
)
Код: Выделить всё
@c_app.task
def add_5_tasks():
for _ in range(800):
task.apply_async(countdown=1, queue='celery_q', priority=1)
Код: Выделить всё
@f_app.post("/add-task/")
def add_task():
task_ = task.apply_async(priority=9, countdown=1, queue='celery_q')
print('Task added with high priority:', task_.id)
return {'task_id': task_.id,
'message': 'Task added with high priority'}
Код: Выделить всё
i = c_app.control.inspect()
scheduled = i.scheduled()
reserved = i.reserved()
active = i.active()
Это работает, только если я добавляю эти настройки в конфигурацию :
Код: Выделить всё
worker_prefetch_multiplier = 1
task_acks_late = True
ГЛАВНЫЙ ВОПРОС: Как включить расстановку приоритетов и получить всю информацию о запланированных задачах из `inspect().scheduled()?
Подробнее здесь: https://stackoverflow.com/questions/788 ... -disabling