Celery возвращает неверную информацию о текущих задачах в одном работникеPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Celery возвращает неверную информацию о текущих задачах в одном работнике

Сообщение Anonymous »

У меня есть пакет, который содержит Celery и RabbitMQ для задач и приложение FastApi для веб-запросов.

Приложение celery запускается из командной строки с помощью celery -A celery_app worker -l info -P gevent.

Rabbit развертывается в Docker-контейнере.

FastApi запускается из скрипта Python.
Вот код. Вопрос ниже.
fastapi_app/main.py

Код: Выделить всё

from __future__ import absolute_import
from fastapi import FastAPI
from celery.result import AsyncResult
from celery_app.tasks import task, get_current_tasks
from celery_app.celery import c_app
from fastapi_app.model import Worker, Task

f_app = FastAPI()

@f_app.post("/task/")
def run_task():
_task = task.apply_async()
return {"task_id": _task.id}

@f_app.get("/task_info/{task_id}")
def get_progress(task_id):
result = AsyncResult(task_id, app=c_app)
return Task(id=task_id, state=result.state, meta=result.info)

@f_app.get("/curr_progress/")
def get_current_progress():
response = {'workers': []}
for worker, tasks_list in get_current_tasks().items():
worker_tasks_id = [task_.get('id') for task_ in tasks_list]
worker_ = Worker(name=worker)
for id_ in worker_tasks_id:
result = AsyncResult(id_, app=c_app)
worker_.tasks.append(Task(id=id_, state=result.state, meta=result.info))
response['workers'].append(worker_)
return response

if __name__ == "__main__":
import uvicorn
uvicorn.run(f_app, host="localhost", port=8000)

fastapi_app/model.py

Код: Выделить всё

from pydantic import BaseModel
from typing import List, Any, Optional

class Task(BaseModel):
id: Optional[str] = None
state: Optional[str] = None
meta: dict | Any | None = None

class Worker(BaseModel):
name: Optional[str] = None
tasks: List[Task] = list()
celery_app/tasks.py

Код: Выделить всё

from __future__ import absolute_import

import threading
import time
from celery_app.celery import c_app

def get_current_tasks() -> dict:
i = c_app.control.inspect()
return i.active()

def get_registered_tasks() -> dict:
i = c_app.control.inspect()
return i.registered()

@c_app.task(bind=True)
def task(self):
print(f'task started in {threading.current_thread()}. Thread alive:')
for i in threading.enumerate():
print(i)

n = 60
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
time.sleep(1)
print(f'task finished in {threading.current_thread()}\n')
return n
celery_app/celery.py

Код: Выделить всё

from __future__ import absolute_import
from celery import Celery

c_app = Celery('celery_app',
broker='amqp://guest:guest@localhost',
backend='rpc://',
include=['celery_app.tasks'])

Там, в fastapi_app/main.py у меня есть функция, которая запускает задачу run_task() и функция, которая получает текущий прогресс всех выполнение задач get_current_progress().

Последний зависит от celery.result.AsyncResult(), который зависит от метода update_state() в задаче (self) в celery_app/tasks.py.
Вот в чем проблема. Когда я запускаю только одну задачу, запрашивая сервер FastApi, ход выполнения задачи отображается правильно.

Код: Выделить всё

{
"workers": [
{
"name": "celery@wsmsk1n3075",
"tasks": [
{
"id": "271531c2-48e6-4c71-a9ef-31bce434c649",
"state": "PROGRESS",
"meta": {
"done": 3,
"total": 60
}
}
]
}
]
}
Но когда я запускаю несколько задач (отправляю пару таск-запросов на сервер FastApi), отображение становится некорректным. Особенно метаинформация о прогрессе.

Код: Выделить всё

{
"workers": [
{
"name": "celery@wsmsk1n3075",
"tasks": [
{
"id": "4d05d0f0-f058-4372-8eec-c84853188655",
"state": "PROGRESS",
"meta": {
"done": 9,
"total": 60
}
},
{
"id": "0ca82db4-7e04-4bfd-9d73-6a190decd4c6",
"state": "PROGRESS",
"meta": null
},
{
"id": "ba8aa34b-e185-47cf-bed3-f3ca07257afc",
"state": "PROGRESS",
"meta": null
},
{
"id": "3e2941f5-1285-4062-aea0-31a3c9b1cc21",
"state": "PROGRESS",
"meta": null
}
]
}
]
}
Важно отметить, что в celery все 4 задачи выполняются в отдельных потоках:

Код: Выделить всё

[2024-07-23 13:02:06,427: WARNING/MainProcess] 
[2024-07-23 13:02:06,427: WARNING/MainProcess] 
[2024-07-23 13:02:06,428: WARNING/MainProcess] 
[2024-07-23 13:02:06,429: WARNING/MainProcess] 
Итак, у них должен быть свой собственный прогресс (который, похоже, не такой, как вы можете видеть в последнем блоке json).
Как Могу ли я получить каждое состояние задачи в случае с несколькими задачами, как это было в примере с одиночной задачей?

Подробнее здесь: https://stackoverflow.com/questions/787 ... one-worker
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»