Код: Выделить всё
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
print("f exits")
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
Первое, что я не понимаю: почему на самом деле возникает взаимоблокировка?
Я понимаю (вероятно, неправильно), что когда целевая функция f завершает работу, поскольку мы не вызывали q.cancel_join_thread(), тогда фоновый поток будет присоединен (т. е. будет вызван q.join_thread()), чтобы позволить ему поместить оставшиеся в буфере данные в фактическую общую очередь). Когда это будет сделано, процесс должен завершиться, и p.join() должен вернуться.
С другой стороны, если мы заменим большой объект 'X' * 1000000 на меньший 'X' * 100, он не заблокируется. Почему размер помещенного объекта влияет на поведение блокировки?
Я знаю, что переключение последних двух строк устраняет взаимоблокировку. Но я ожидаю, что query.get что-то создаст (получит что-то) в общем объекте, а q.put() не воздействует непосредственно на этот общий объект (очередь), а скорее поместит что-то в буфер, а фоновый поток позаботится о перемещении материала из буфера в фактический общий объект. Я мог бы понять утверждение типа «
Код: Выделить всё
join_thread EDIT 1
Мой второй вопрос касался ненадежного поведения get_nowait, проиллюстрированного в следующем MWE:
Код: Выделить всё
def f(q: Queue):
q.cancel_join_thread()
q.put('X' * 1000000)
print("f exits")
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
print("AFTER join")
obj = queue.get_nowait()
print("THE END", len(obj))
Спасибо q.cancel_join_thread() в начале целевой f, дочернему процессу разрешено завершить работу без присоединения к потоку, и это причина, по которой выполнение вышеуказанного MWE дает как минимум:
Код: Выделить всё
f exits
AFTER join
- либо «THE END...». Это означает, что поток передал большой объект, а последний был использован get_nowait()
- или исключением Empty, что означает, что большой объект не был передан в очередь
Вероятно, это причина явно случайного поведения, не так ли?
Мой последний вопрос:
В документации по многопроцессорности Python советуют использовать mp.manager.queue() для создания очереди, предназначенной для совместного использования между процессами. Действительно, если я заменю свой код следующим:
Код: Выделить всё
def f(q: Queue):
q.put('X' * 1000000)
print("f exits")
if __name__ == '__main__':
# queue = Queue()
man = Manager()
queue = man.Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join()
print("AFTER join")
time.sleep(1) # even 1 second after child process exited
obj = queue.get_nowait()
print("THE END", len(obj))
Мой вопрос: могу ли я наверно полагаться на тот факт, что объект, помещенный в такую прокси-очередь, не будет потерян, даже если дочерний процесс завершит работу?
Подробнее здесь: https://stackoverflow.com/questions/798 ... -returning
Мобильная версия