Невозможно настроить ведение журнала Celery с помощью файла конфигурации YAML с сервером FastAPI.Python

Программы на Python
Ответить
Anonymous
 Невозможно настроить ведение журнала Celery с помощью файла конфигурации YAML с сервером FastAPI.

Сообщение Anonymous »

Я использую Celery в качестве обработчика фоновых задач с веб-сервером FastAPI. Ведение журнала для FastAPI настраивается с помощью файла logging.yaml через Uvicorn следующим образом:

Код: Выделить всё

uvicorn syncapp.main:app --workers 4 --host 0.0.0.0 --port 8084 --log-config logging.yaml
Я настроил несколько средств ведения журнала, которые отлично работают с сервером API. Однако, что бы я ни делал, я не могу заставить Celery использовать регистратор, определенный в файле logging.yaml. Я пробовал другие решения, но ни одно из них не помогло в моем случае. Структура каталогов приложения выглядит следующим образом:

Код: Выделить всё

myproj
|
|--- syncapp
|    |--- tasks
|    |    |--- copy_data.py  (this is a task)
|    |
|    |--- main.py  (FastAPI application entrypoint)
|    |--- worker.py  (Celery application entrypoint)
|
|--- logging.yaml
|--- pyproject.toml
Вот основное приложение Celery: (

Код: Выделить всё

file: syncapp/worker.py
)

Код: Выделить всё

from celery import Celery

from syncapp.settings import get_settings

settings = get_settings()

app = Celery(
main=settings.app_name,
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include="syncapp.tasks.copy_data",
)

app.conf.update(
broker_connection_retry_on_startup=True,
worker_hijack_root_logger=False,
)
И вот как задача обращается к объекту журнала (

Код: Выделить всё

file: syncapp/tasks/copy_data.py
)

Код: Выделить всё

from celery.utils.log import get_task_logger

from syncapp.commons.enums import WorkerQueue
from syncapp.worker import app

logger = get_task_logger(__name__)

@app.task(queue=WorkerQueue.COPY_DATA.value)
def copy_index_data(index_name: str) -> bool:
"""Copies index data up until the current date."""
logger.info("Copying data from source index %s", index_name)
return True
Содержимое файла logging.yaml (некоторые ключи удалены, поскольку они не связаны между собой):

Код: Выделить всё

version: 1

objects:
syncapp_queue:
class: queue.Queue
maxsize: 1000
celery_queue:
class: queue.Queue
maxsize: 1000

formatters:
simple:
format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s]: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
extended:
format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s] - [%(module)s:%(lineno)d]: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"

handlers:
console:
class: logging.StreamHandler
formatter: simple
stream: ext://sys.stdout

syncapp_file_handler:
class: logging.FileHandler
filename: logs/syncapp.log
formatter: extended

celery_file_handler:
class: logging.FileHandler
filename: logs/synctask.log
formatter: extended

syncapp_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.syncapp_file_handler
queue: cfg://objects.syncapp_queue

celery_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.celery_file_handler
queue: cfg://objects.celery_queue

loggers:
syncapp:
level: DEBUG
handlers:
- syncapp_queue_handler
propagate: no
syncapp.tasks:
level: DEBUG
handlers:
- celery_queue_handler
propagate: no
celery:
level: INFO
handlers:
- celery_queue_handler
propagate: no

root:
level: INFO
handlers:
- console
Я хочу, чтобы серверная часть Celery записывала свои события с помощью регистратора celery, а журналы задач — в регистратор syncapp.tasks. Основные журналы приложений должны проходить через регистратор syncapp (что и происходит).
Я пробовал сигналы настройки, но они не помогли. Например:

Код: Выделить всё

@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
logger = kwargs["logger"]
for handler in logger.handlers:
logger.removeHandler(handler)
with open("logging.yaml", "r") as config_file:
# Load logging configuration from YAML file
YAMLConfig(config_file.read(), silent=True)
logger = logging.getLogger("syncapp.tasks")
logger.info("Logging configuration loaded. Handlers: %s", logger.handlers)
kwargs["logger"] = logger
Фактически это вызывает ошибку в журнале консоли Celery

Код: Выделить всё

syncdata-1  | Exception in thread Thread-1 (_monitor):
syncdata-1  | Traceback (most recent call last):
syncdata-1  |   File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
syncdata-1  |     self.run()
syncdata-1  |   File "/usr/local/lib/python3.11/threading.py", line 975, in run
syncdata-1  |     self._target(*self._args, **self._kwargs)
syncdata-1  |   File "/usr/local/lib/python3.11/logging/handlers.py", line 1584, in _monitor
syncdata-1  |     self.handle(record)
syncdata-1  |   File "/usr/local/lib/python3.11/logging/handlers.py", line 1563, in handle
syncdata-1  |     process = record.levelno >= handler.level
syncdata-1  |                                 ^^^^^^^^^^^^^
syncdata-1  | AttributeError: 'ConvertingDict' object has no attribute 'level'
С Flask это было не очень сложно. Но здесь я не могу найти способ заставить его выводить журналы в указанные файлы.

Подробнее здесь: https://stackoverflow.com/questions/790 ... th-fastapi
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»