В многопроцессорной обработке Python делитесь очередью и get_nowait не возвращаетсяPython

Программы на Python
Ответить
Anonymous
 В многопроцессорной обработке Python делитесь очередью и get_nowait не возвращается

Сообщение Anonymous »

Я пытаюсь глубже понять общие очереди многопроцессорной обработки Python и так далее. В частности, некоторые варианты MWE приведены в документации https://docs.python.org/3.9/library/mul ... rt-methods, раздел «Присоединение к процессам, использующим очереди», который я воспроизвожу здесь:

Код: Выделить всё

from multiprocessing import Process, Queue

def f(q):
q.put('X' * 1000000)
print("f exits")

if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join()                    # this deadlocks
obj = queue.get()
Выполнение этого кода приводит к выходу f, а затем скрипты блокируются, как утверждается в документации.
Первое, что я не понимаю: почему на самом деле возникает взаимоблокировка?
Я понимаю (вероятно, неправильно), что когда целевая функция f завершает работу, поскольку мы не вызывали q.cancel_join_thread(), тогда фоновый поток будет присоединен (т. е. будет вызван q.join_thread()), чтобы позволить ему поместить оставшиеся в буфере данные в фактическую общую очередь). Когда это будет сделано, процесс должен завершиться, и p.join() должен вернуться.
С другой стороны, если мы заменим большой объект 'X' * 1000000 на меньший 'X' * 100, он не заблокируется. Почему размер помещенного объекта влияет на поведение блокировки?
Я знаю, что переключение последних двух строк устраняет взаимоблокировку. Но я ожидаю, что query.get что-то создаст (получит что-то) в общем объекте, а q.put() не воздействует непосредственно на этот общий объект (очередь), а скорее поместит что-то в буфер, а фоновый поток позаботится о перемещении материала из буфера в фактический общий объект. Я мог бы понять утверждение типа «

Код: Выделить всё

join_thread 
гарантирует, что все данные, помещенные в очередь, были получены с другой стороны, а затем позволяет дочернему процессу выйти", но это не тот случай, когда большой объект заменяется меньшим.
EDIT 1
Мой второй вопрос касался ненадежного поведения get_nowait, проиллюстрированного в следующем MWE:

Код: Выделить всё

def f(q: Queue):
q.cancel_join_thread()
q.put('X' * 1000000)
print("f exits")

if __name__ == '__main__':
queue = Queue()

p = Process(target=f, args=(queue,))
p.start()
p.join()                    # this deadlocks
print("AFTER join")
obj  = queue.get_nowait()
print("THE END", len(obj))
Благодаря ответу @david-maze я понял, что большой объект в очереди, вероятно, слишком велик для unix-канала, лежащего в основе общей очереди, и поэтому поток (ответственный за передачу данных q.put в фактическую общую очередь) не будет буферизовать его, а скорее будет ждать, пока другой конец канала воспользуется этим объектом.
Спасибо q.cancel_join_thread() в начале целевой f, дочернему процессу разрешено завершить работу без присоединения к потоку, и это причина, по которой выполнение вышеуказанного MWE дает как минимум:

Код: Выделить всё

f exits
AFTER join
Самое смешное, что последняя часть кода выдает случайным образом:
  • либо «THE END...». Это означает, что поток передал большой объект, а последний был использован get_nowait()
  • или исключением Empty, что означает, что большой объект не был передан в очередь
Предполагаю, что причины в следующем: дочерний процесс завершается, не вызывая join_thread(). Поэтому поток продолжает жить и имеет возможность дождаться потребления большого объекта. Но с другой стороны, вскоре после завершения дочернего процесса поток, вероятно, будет собран мусором Unix, потому что его родительский процесс больше не существует.
Вероятно, это причина явно случайного поведения, не так ли?
Мой последний вопрос:
В документации по многопроцессорности Python советуют использовать mp.manager.queue() для создания очереди, предназначенной для совместного использования между процессами. Действительно, если я заменю свой код следующим:

Код: Выделить всё

def f(q: Queue):
q.put('X' * 1000000)
print("f exits")

if __name__ == '__main__':
# queue = Queue()
man = Manager()
queue = man.Queue()

p = Process(target=f, args=(queue,))
p.start()
p.join()
print("AFTER join")
time.sleep(1)  # even 1 second after child process exited
obj  = queue.get_nowait()
print("THE END", len(obj))
Тогда я всегда получаю большой объект из очереди при вызове очереди.get_nowait(), даже если дочерний процесс был закрыт какое-то время. Вероятно, Queue() из Manager (который относится к классу AutoProxy[Queue]) имеет в фоновом режиме другой механизм для работы с буфером, очередью, ...
Мой вопрос: могу ли я наверно полагаться на тот факт, что объект, помещенный в такую ​​прокси-очередь, не будет потерян, даже если дочерний процесс завершит работу?

Подробнее здесь: https://stackoverflow.com/questions/798 ... -returning
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»