`multiprocessing.JoinableQueue.empty()` кажется сильно сломанным [закрыто]Python

Программы на Python
Ответить
Anonymous
 `multiprocessing.JoinableQueue.empty()` кажется сильно сломанным [закрыто]

Сообщение Anonymous »

Краткая версия. Я осознаю тот факт, что multiprocess.Queue.empty() ненадежен. Для меня это имеет смысл. Я понимаю, что действия одного потока могут занять некоторое время, прежде чем они станут видны другому потоку. В документации ясно указано, что:

После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод пустой() очереди вернет значение False. . . .

Я могу с этим жить. Но в моем коде я вижу, что ueue.empty() возвращает True, даже если в очереди содержится несколько сотен элементов.
Чтобы доказать свою правоту, я добавил в свой код общую переменную Task_count, увеличил ее (с блокировкой) в put() и уменьшил ее в get(). Его значение равно нулю в начале и конце моего кода, поэтому я считаю, что оно реализовано точно. Я использую MacOS, поэтому очередь.count() не работает. Я ясно вижу, что в моей очереди есть несколько задач, даже если она утверждает, что пуста.
Мой код представляет собой реализацию рекурсивного алгоритма, который использует очередь задач для отслеживания ожидающих работ. Изначально в очереди задач есть одна задача, но эта задача создает дополнительные задачи.

Код: Выделить всё

class Worker(multiprocessing.Process):
# task_queue is a JoinableQueue.
def __init__(task_queue, result_queue, task_queue_count):
self.task_queue = task_queue
self.result_queue = result_queue
self.task_queue_count = task_queue_count

def run(self):
info = self.task_queue.get()
# Following is for debugging, only
with self.task_queue_count.get_lock():
self.task_queue_count.value -= 1

if info is None:  # Poison pill for shutdown
self.task_queue.task_done()
break
self.handle_one_task(info)
self.task_queue.task_done()

def handle_one_task(self, info):
local_queue = deque([info])
while local_queue:
current_index, task = local_queue.popleft()
results = code_not_shown_here(current_index, task)

if not results:
continue

if current_index + 1 == MAX_RECURSION_DEPTH:
# 
continue

if self.task_queue.empty():
print("Queue appears empty", self.task_queue_count.value)

for result in results:
if len(local_queue) < MAX_LOCAL_QUEUE_SIZE:
local_queue.append((current_index + 1), result)
else:
self.task_queue.put((current_index + 1), result)
# following is for debugging only
with self.task_queue_count.get_lock():
self.task_queue_count.value += 1
В коде используется локальная очередь, чтобы избежать чрезмерного перемещения в очередь задач и из нее. если размер результатов невелик, он может эмулировать хвостовую рекурсию. Однако когда я действительно проверял self.task_queue.empty(), я получал значение True, когда self.task_queue_count.value было большим.
Я пытался написать код так, чтобы, если очередь задач пуста, последний цикл помещал все свои результаты в очередь задач, а не пытался обрабатывать их локально. Могут быть другие потоки, ожидающие выполнения своей работы.

Подробнее здесь: https://stackoverflow.com/questions/798 ... dly-broken
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»