После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод пустой() очереди вернет значение False. . . .
Я могу с этим жить. Но в моем коде я вижу, что ueue.empty() возвращает True, даже если в очереди содержится несколько сотен элементов.
Чтобы доказать свою правоту, я добавил в свой код общую переменную Task_count, увеличил ее (с блокировкой) в put() и уменьшил ее в get(). Его значение равно нулю в начале и конце моего кода, поэтому я считаю, что оно реализовано точно. Я использую MacOS, поэтому очередь.count() не работает. Я ясно вижу, что в моей очереди есть несколько задач, даже если она утверждает, что пуста.
Мой код представляет собой реализацию рекурсивного алгоритма, который использует очередь задач для отслеживания ожидающих работ. Изначально в очереди задач есть одна задача, но эта задача создает дополнительные задачи.
Код: Выделить всё
class Worker(multiprocessing.Process):
# task_queue is a JoinableQueue.
def __init__(task_queue, result_queue, task_queue_count):
self.task_queue = task_queue
self.result_queue = result_queue
self.task_queue_count = task_queue_count
def run(self):
info = self.task_queue.get()
# Following is for debugging, only
with self.task_queue_count.get_lock():
self.task_queue_count.value -= 1
if info is None: # Poison pill for shutdown
self.task_queue.task_done()
break
self.handle_one_task(info)
self.task_queue.task_done()
def handle_one_task(self, info):
local_queue = deque([info])
while local_queue:
current_index, task = local_queue.popleft()
results = code_not_shown_here(current_index, task)
if not results:
continue
if current_index + 1 == MAX_RECURSION_DEPTH:
#
continue
if self.task_queue.empty():
print("Queue appears empty", self.task_queue_count.value)
for result in results:
if len(local_queue) < MAX_LOCAL_QUEUE_SIZE:
local_queue.append((current_index + 1), result)
else:
self.task_queue.put((current_index + 1), result)
# following is for debugging only
with self.task_queue_count.get_lock():
self.task_queue_count.value += 1
Я пытался написать код так, чтобы, если очередь задач пуста, последний цикл помещал все свои результаты в очередь задач, а не пытался обрабатывать их локально. Могут быть другие потоки, ожидающие выполнения своей работы.
Подробнее здесь: https://stackoverflow.com/questions/798 ... dly-broken
Мобильная версия