Краткая версия. Я осознаю тот факт, что multiprocess.Queue.empty() ненадежен. Для меня это имеет смысл. Я понимаю, что действия одного потока могут занять некоторое время, прежде чем они станут видны другому потоку. В документации ясно указано, что:
После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод пустой() очереди вернет значение False. . . .
Я могу с этим жить. Но в моем коде я вижу, что методqueue.empty() возвращает True, даже если в очереди много элементов.
Эта программа представляет собой поддельную часть рекурсии. Очередь задач содержит целые числа, указывающие текущую глубину рекурсии.
Для глубины 0 мы создаем 500 элементов глубины 1. Для глубины меньше MAX_RECURSION_DEPTH мы создаем случайное количество задач глубины + 1; мы используем распределение Парето, так что числа в основном маленькие и усекаются до 0, но иногда попадаются и большие числа. Для глубины MAX_RECURSION_DEPTH мы ничего не делаем.
Код: Выделить всё
import multiprocessing
import queue
from collections import deque
import numpy as np
START_DEPTH = 0
MAX_RECURSION_DEPTH = 5
MAX_LOCAL_QUEUE_SIZE = 1000
WORKER_COUNT = 4
def run_workers():
task_queue = multiprocessing.JoinableQueue(MAX_LOCAL_QUEUE_SIZE)
task_count = multiprocessing.Value("L")
print_lock = multiprocessing.Lock()
task_queue.put(START_DEPTH)
task_count.value = 1
workers = [Worker(task_queue, task_count, print_lock) for i in range(WORKER_COUNT)]
for worker in workers:
worker.start()
task_queue.join()
for worker in workers:
worker.kill()
class Worker(multiprocessing.Process):
# task_queue is a JoinableQueue.
def __init__(self, task_queue, task_queue_count, print_lock):
super().__init__()
self.task_queue = task_queue
self.task_queue_count = task_queue_count
self.print_lock = print_lock
def run(self):
while True:
depth = self.task_queue.get()
# Following is for debugging, only
with self.task_queue_count.get_lock():
self.task_queue_count.value -= 1
self.handle_one_task(depth)
self.task_queue.task_done()
def handle_one_task(self, depth):
with self.print_lock:
print(f'{self.name} {depth=} qs={self.task_queue_count.value}')
local_queue = deque([depth])
while local_queue:
current_index = local_queue.popleft()
if current_index == START_DEPTH:
results_count = 50
else:
# usually a small number, but it has a large tail
results_count = int(np.random.pareto(1))
if current_index + 1 == MAX_RECURSION_DEPTH:
#
continue
# Print a message if task_queue_empty() is true, yet there appear to
# be items in the task queue.
if self.task_queue.empty():
temp = self.task_queue_count.value
if temp > 0:
with self.print_lock:
print(f"{self.name} Queue appears empty qs={temp}")
task_queue_full = False # I don't trust task_queue.full()
write_count = 0
for _ in range(results_count):
# At the START_DEPTH, there is only a single task, so we want to get
# other processes started. Otherwise, we prefer to write to the
# local queue if its not full.
if current_index == START_DEPTH or (len(local_queue) >= MAX_LOCAL_QUEUE_SIZE and not task_queue_full):
try:
self.task_queue.put(current_index + 1, False)
with self.task_queue_count.get_lock():
self.task_queue_count.value += 1
write_count += 1
continue
except queue.Full:
task_queue_full = True
# fall through to writing to local queue
local_queue.append(current_index + 1)
if write_count > 0:
with self.print_lock:
print(f'{self.name} {depth=} {write_count=} qs={self.task_queue_count.value}')
if __name__ == '__main__':
run_workers()
К сожалению, когда я запускаю этот код, я вижу вывод следующего вида:
Код: Выделить всё
Worker-2 Queue appears empty qs=14
Worker-4 Queue appears empty qs=14
Worker-2 Queue appears empty qs=14
Worker-1 depth=4 qs=16
Worker-4 Queue appears empty qs=19
Worker-2 Queue appears empty qs=24
Worker-4 Queue appears empty qs=30
Обратите внимание, что я работаю под MacOS, поэтому очередь.qsize() не реализована.>
Подробнее здесь: https://stackoverflow.com/questions/798 ... ems-broken
Мобильная версия