- Консольное средство ведения журнала (выводит сообщения на стандартный вывод)
- Файловое средство ведения журнала (регистрирует структурированные сообщения JSON в файл output.log)
Моя настройка выглядит примерно так:
- создает:
Код: Выделить всё
setup_logging()
мультипроцессорную очередь - консоль и средства ведения журнала JSON
- консоль и обработчики JSON
- QueueListener с обоими обработчиками
Код: Выделить всё
worker(task)[*]Задачи отправляются через ThreadPoolExecutor.
Код: Выделить всё
import logging
import logging.handlers
import multiprocessing
import concurrent.futures
import json
# ------------------------------------------------------------
# Setup logging
# ------------------------------------------------------------
def setup_logging():
log_queue = multiprocessing.Manager().Queue(-1)
# Console logger
console_logger = logging.getLogger("console_logger")
console_logger.setLevel(logging.DEBUG)
console_logger.addHandler(logging.handlers.QueueHandler(log_queue))
# JSON logger
json_logger = logging.getLogger("json_logger")
json_logger.setLevel(logging.INFO)
json_logger.addHandler(logging.handlers.QueueHandler(log_queue))
# Console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter("%(message)s"))
# JSON file handler
json_handler = logging.FileHandler("path/to/json.log")
json_handler.setFormatter(logging.Formatter("%(message)s"))
# Queue listener
listener = logging.handlers.QueueListener(
log_queue,
console_handler,
json_handler,
)
return listener, console_logger, json_logger
# ------------------------------------------------------------
# Worker function
# ------------------------------------------------------------
def worker(task, console_logger, json_logger):
try:
# perform some task
result = perform_task(task)
# log JSON result to JSON logger
json_logger.info(json.dumps(result))
# log human-readable message to console logger
console_logger.info("Task completed for: %s", task)
except Exception as e:
console_logger.error("Error processing task %s: %s", task, e)
# ------------------------------------------------------------
# Main program
# ------------------------------------------------------------
if __name__ == "__main__":
listener, console_logger, json_logger = setup_logging()
listener.start()
tasks = get_task_list() # abstracted task list
# Threaded execution
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [
executor.submit(worker, task, console_logger, json_logger)
for task in tasks
]
# wait for all threads to finish
for f in futures:
f.result()
listener.stop()
- Правильно ли это настроить QueueHandler + QueueListener с несколькими регистраторами?
- Каков рекомендуемый шаблон для безопасного разделения выходных данных JSON и консоли в многопоточной программе Python?
Мобильная версия