Multiprocessing.JoinableQueue.empty() кажется сломаннымPython

Программы на Python
Ответить
Anonymous
 Multiprocessing.JoinableQueue.empty() кажется сломанным

Сообщение Anonymous »

[Я повторно задаю вопрос, который был только что закрыт. Теперь у меня есть код, который дает сбой.]
Краткая версия. Я осознаю тот факт, что multiprocess.Queue.empty() ненадежен. Для меня это имеет смысл. Я понимаю, что действия одного потока могут занять некоторое время, прежде чем они станут видны другому потоку. В документации ясно указано, что:

После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод пустой() очереди вернет значение False. . . .

Я могу с этим жить. Но в моем коде я вижу, что методqueue.empty() возвращает True, даже если в очереди много элементов.
Эта программа представляет собой поддельную часть рекурсии. Очередь задач содержит целые числа, указывающие текущую глубину рекурсии.
Для глубины 0 мы создаем 500 элементов глубины 1. Для глубины меньше MAX_RECURSION_DEPTH мы создаем случайное количество задач глубины + 1; мы используем распределение Парето, так что числа в основном маленькие и усекаются до 0, но иногда попадаются и большие числа. Для глубины MAX_RECURSION_DEPTH мы ничего не делаем.

Код: Выделить всё

import multiprocessing
import queue
from collections import deque

import numpy as np

START_DEPTH = 0
MAX_RECURSION_DEPTH = 5
MAX_LOCAL_QUEUE_SIZE = 1000
WORKER_COUNT = 4

def run_workers():
task_queue = multiprocessing.JoinableQueue(MAX_LOCAL_QUEUE_SIZE)
task_count = multiprocessing.Value("L")
print_lock = multiprocessing.Lock()

task_queue.put(START_DEPTH)
task_count.value = 1

workers = [Worker(task_queue, task_count, print_lock) for i in range(WORKER_COUNT)]
for worker in workers:
worker.start()

task_queue.join()
for worker in workers:
worker.kill()

class Worker(multiprocessing.Process):
# task_queue is a JoinableQueue.
def __init__(self, task_queue, task_queue_count, print_lock):
super().__init__()
self.task_queue = task_queue
self.task_queue_count = task_queue_count
self.print_lock = print_lock

def run(self):
while True:
depth = self.task_queue.get()
# Following is for debugging, only
with self.task_queue_count.get_lock():
self.task_queue_count.value -= 1
self.handle_one_task(depth)
self.task_queue.task_done()

def handle_one_task(self, depth):
with self.print_lock:
print(f'{self.name} {depth=} qs={self.task_queue_count.value}')
local_queue = deque([depth])
while local_queue:
current_index = local_queue.popleft()

if current_index == START_DEPTH:
results_count = 50
else:
# usually a small number, but it has a large tail
results_count = int(np.random.pareto(1))

if current_index + 1 == MAX_RECURSION_DEPTH:
# 
continue

# Print a message if task_queue_empty() is true, yet there appear to
# be items in the task queue.
if self.task_queue.empty():
temp = self.task_queue_count.value
if temp > 0:
with self.print_lock:
print(f"{self.name} Queue appears empty qs={temp}")

task_queue_full = False   # I don't trust task_queue.full()
write_count = 0
for _ in range(results_count):
# At the START_DEPTH, there is only a single task, so we want to get
# other processes started.  Otherwise, we prefer to write to the
# local queue if its not full.
if current_index == START_DEPTH or (len(local_queue) >= MAX_LOCAL_QUEUE_SIZE and not task_queue_full):
try:
self.task_queue.put(current_index + 1, False)
with self.task_queue_count.get_lock():
self.task_queue_count.value += 1
write_count += 1
continue
except queue.Full:
task_queue_full = True
# fall through to writing to local queue
local_queue.append(current_index + 1)

if write_count > 0:
with self.print_lock:
print(f'{self.name} {depth=} {write_count=} qs={self.task_queue_count.value}')

if __name__ == '__main__':
run_workers()
В настоящее время я предпочитаю писать в локальную очередь. Но если очередь задач пуста (я знаю, что это происходит, когда current_index == START_DEPTH, а также может быть и в другое время), тогда я хотел бы поручить немного работы другим процессам. Я надеялся использовать self.task_queue.empty() для обоих случаев.
К сожалению, когда я запускаю этот код, я вижу вывод следующего вида:

Код: Выделить всё

Worker-2 Queue appears empty qs=14
Worker-4 Queue appears empty qs=14
Worker-2 Queue appears empty qs=14
Worker-1 depth=4 qs=16
Worker-4 Queue appears empty qs=19
Worker-2 Queue appears empty qs=24
Worker-4 Queue appears empty qs=30
Это не «бесконечно малая задержка».
Обратите внимание, что я работаю под MacOS, поэтому очередь.qsize() не реализована.>

Подробнее здесь: https://stackoverflow.com/questions/798 ... ems-broken
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»