Ведение журнала Python в один файл в ProcessPoolExecutor внутри рабочих процессов с QueueHandlerPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Ведение журнала Python в один файл в ProcessPoolExecutor внутри рабочих процессов с QueueHandler

Сообщение Anonymous »

Я хочу добавить журналы в один файл от разных процессов, содержащих ProcessPoolExectuor.
Мое приложение имеет следующую структуру:
  • Основной процесс
  • Вторичный процесс с пулом процессов: ProcessPoolExecutor
Оба вторичный процесс и пул процессов остаются открытыми все время, пока приложение не закроется.
Приведенный ниже пример представляет собой пример архитектуры моего приложения. Я пытаюсь настроить очередь многопроцессорной обработки для обработки всех журналов, поступающих от процесса и PoolProcessExecutor, чтобы доступ к файлу был безопасным.
Я пытался использовать пример из документации Python, и добавил PoolProcesExecutor, но в примере ниже логи основного процесса не сохраняются в файле. Чего мне не хватает?
import logging
import logging.handlers
import multiprocessing
from concurrent.futures import ProcessPoolExecutor
import sys
import traceback
from random import choice

# Arrays used for random selections in this demo
LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL]

NUMBER_OF_WORKERS = 2
NUMBER_OF_POOL_WORKERS = 1
NUMBER_OF_MESSAGES = 1

LOG_SIZE = 1024 # 10 KB
BACKUP_COUNT = 10
LOG_FORMAT = '%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s'
LOG_NAME = 'mptest.log'

def configure_logging_format():
"""
Configure the listener process logging.
"""
root = logging.getLogger()
h = logging.handlers.RotatingFileHandler(LOG_NAME, 'a', LOG_SIZE, BACKUP_COUNT)
f = logging.Formatter(LOG_FORMAT)
h.setLevel(logging.DEBUG)
h.setFormatter(f)
root.addHandler(h)

def main_process_listener(queue: multiprocessing.Queue):
"""
This is the listener process top-level loop: wait for logging events
(LogRecords)on the queue and handle them, quit when you get a None for a
LogRecord.

Parameters
----------
queue: Queue
Queue to get the log records from.
"""
print('Trigger main listener.')
configure_logging_format()
while True:
try:
record = queue.get()
if record is None: # sentinel to tell the listener to quit.
break
logger = logging.getLogger(record.name)
logger.handle(record) # No level or filter logic applied - just do it!
except Exception:
traceback.print_exc(file=sys.stderr)

def broadcast_logs_from_pool_to_main_listener(pool_process_queue, main_process_queue):
"""
This is the listener process top-level loop: wait for logging events from the pool process
and broadcast them to the main listener process.

pool_process_queue: Queue
Pool process queue to get the log records from.
main_process_queue: Queue
Main process queue to put the log records to.
"""
print('Broadcasting logs from pool to main listener.')
# configure_logging_format()
while True:
try:
record = pool_process_queue.get()
if record is None: # sentinel to tell the listener to quit.
break
# TODO: apply level of filtering
main_process_queue.put(record)
except Exception:
traceback.print_exc(file=sys.stderr)

def configure_logging_for_multiprocessing(queue):
"""
The worker configuration is done at the start of the worker process run.
Note that on Windows you can't rely on fork semantics, so each process
will run the logging configuration code when it starts.
"""
print('Configuring logging for multiprocessing...')
h = logging.handlers.QueueHandler(queue) # Handler needed to send records to queue
root = logging.getLogger()
root.addHandler(h)
# send all messages, for demo; no other level or filter logic applied.
root.setLevel(logging.DEBUG)

def pool_process(queue):
configure_logging_for_multiprocessing(queue)

name = multiprocessing.current_process().name
print('Pool process started: %s' % name)
logging.getLogger(name).log(choice(LEVELS), 'message')
print('Pool process finished: %s' % name)

def worker_process(queue):
"""
Worker process that logs messages to the queue.

Parameters
----------
queue: Queue
Queue to log the messages to.
"""
configure_logging_for_multiprocessing(queue)

pool_queue = multiprocessing.Manager().Queue(-1)
lp = multiprocessing.Process(target=broadcast_logs_from_pool_to_main_listener, args=(pool_queue, queue))
lp.start()

# Create ProcessPoolExecutor
executor = ProcessPoolExecutor(max_workers=NUMBER_OF_POOL_WORKERS)
for i in range(NUMBER_OF_POOL_WORKERS):
executor.submit(pool_process, pool_queue)

# Send message
name = multiprocessing.current_process().name
print('Worker started: %s' % name)
logging.getLogger(name).log(choice(LEVELS), 'message')
print('Worker finished: %s' % name)

# Shutdown the executor and the listener
executor.shutdown()
pool_queue.put_nowait(None)

if __name__ == '__main__':
main_logging_queue = multiprocessing.Manager().Queue()

# Start the listener process
lp = multiprocessing.Process(target=main_process_listener, args=(main_logging_queue,))
lp.start()

logging.getLogger('main_1').log(choice(LEVELS), 'main process 1')

# Start the worker processes
workers = []
for i in range(NUMBER_OF_WORKERS):
worker = multiprocessing.Process(target=worker_process, args=(main_logging_queue,))
workers.append(worker)
worker.start()

# Log a message from the main process
logging.getLogger('main_2').log(choice(LEVELS), 'main process 1')

# Wait for all of the workers to finish
for w in workers:
w.join()

main_logging_queue.put_nowait(None)
lp.join()


Подробнее здесь: https://stackoverflow.com/questions/787 ... ss-workers
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Ведение журнала Python: создайте журнал, если он не существует, или откройте и продолжите ведение журнала, если он сущес
    Anonymous » » в форуме Python
    0 Ответы
    46 Просмотры
    Последнее сообщение Anonymous
  • Ведение журнала Python, цветное ведение журнала
    Anonymous » » в форуме Python
    0 Ответы
    40 Просмотры
    Последнее сообщение Anonymous
  • Ведение журнала Python, цветное ведение журнала
    Anonymous » » в форуме Python
    0 Ответы
    40 Просмотры
    Последнее сообщение Anonymous
  • Ведение журнала Python, цветное ведение журнала
    Anonymous » » в форуме Python
    0 Ответы
    22 Просмотры
    Последнее сообщение Anonymous
  • Ведение журнала Python, цветное ведение журнала
    Anonymous » » в форуме Python
    0 Ответы
    28 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»