Код: Выделить всё
uvicorn syncapp.main:app --workers 4 --host 0.0.0.0 --port 8084 --log-config logging.yaml
Код: Выделить всё
myproj
|
|--- syncapp
| |--- tasks
| | |--- copy_data.py (this is a task)
| |
| |--- main.py (FastAPI application entrypoint)
| |--- worker.py (Celery application entrypoint)
|
|--- logging.yaml
|--- pyproject.toml
Код: Выделить всё
file: syncapp/worker.pyКод: Выделить всё
from celery import Celery
from syncapp.settings import get_settings
settings = get_settings()
app = Celery(
main=settings.app_name,
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include="syncapp.tasks.copy_data",
)
app.conf.update(
broker_connection_retry_on_startup=True,
worker_hijack_root_logger=False,
)
Код: Выделить всё
file: syncapp/tasks/copy_data.pyКод: Выделить всё
from celery.utils.log import get_task_logger
from syncapp.commons.enums import WorkerQueue
from syncapp.worker import app
logger = get_task_logger(__name__)
@app.task(queue=WorkerQueue.COPY_DATA.value)
def copy_index_data(index_name: str) -> bool:
"""Copies index data up until the current date."""
logger.info("Copying data from source index %s", index_name)
return True
Код: Выделить всё
version: 1
objects:
syncapp_queue:
class: queue.Queue
maxsize: 1000
celery_queue:
class: queue.Queue
maxsize: 1000
formatters:
simple:
format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s]: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
extended:
format: "[%(asctime)s.%(msecs)03d] [pid %(process)d] [%(levelname)s] - [%(module)s:%(lineno)d]: %(message)s"
datefmt: "%Y-%m-%d %H:%M:%S"
handlers:
console:
class: logging.StreamHandler
formatter: simple
stream: ext://sys.stdout
syncapp_file_handler:
class: logging.FileHandler
filename: logs/syncapp.log
formatter: extended
celery_file_handler:
class: logging.FileHandler
filename: logs/synctask.log
formatter: extended
syncapp_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.syncapp_file_handler
queue: cfg://objects.syncapp_queue
celery_queue_handler:
class: logging_.handlers.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.celery_file_handler
queue: cfg://objects.celery_queue
loggers:
syncapp:
level: DEBUG
handlers:
- syncapp_queue_handler
propagate: no
syncapp.tasks:
level: DEBUG
handlers:
- celery_queue_handler
propagate: no
celery:
level: INFO
handlers:
- celery_queue_handler
propagate: no
root:
level: INFO
handlers:
- console
Я пробовал сигналы настройки, но они не помогли. Например:
Код: Выделить всё
@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
logger = kwargs["logger"]
for handler in logger.handlers:
logger.removeHandler(handler)
with open("logging.yaml", "r") as config_file:
# Load logging configuration from YAML file
YAMLConfig(config_file.read(), silent=True)
logger = logging.getLogger("syncapp.tasks")
logger.info("Logging configuration loaded. Handlers: %s", logger.handlers)
kwargs["logger"] = logger
Код: Выделить всё
syncdata-1 | Exception in thread Thread-1 (_monitor):
syncdata-1 | Traceback (most recent call last):
syncdata-1 | File "/usr/local/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
syncdata-1 | self.run()
syncdata-1 | File "/usr/local/lib/python3.11/threading.py", line 975, in run
syncdata-1 | self._target(*self._args, **self._kwargs)
syncdata-1 | File "/usr/local/lib/python3.11/logging/handlers.py", line 1584, in _monitor
syncdata-1 | self.handle(record)
syncdata-1 | File "/usr/local/lib/python3.11/logging/handlers.py", line 1563, in handle
syncdata-1 | process = record.levelno >= handler.level
syncdata-1 | ^^^^^^^^^^^^^
syncdata-1 | AttributeError: 'ConvertingDict' object has no attribute 'level'
Подробнее здесь: https://stackoverflow.com/questions/790 ... th-fastapi
Мобильная версия