Многопроцессорность Python: неправильная синхронизация привела к зависанию подпроцесса в waiter.acquire() ⇐ Python
Многопроцессорность Python: неправильная синхронизация привела к зависанию подпроцесса в waiter.acquire()
Я писал параллельный рабочий процесс для небольшого тестового проекта, который включал следующий рабочий процесс:
[*]Существует четыре отдельные процедуры, каждая из которых принимает один параметр и возвращает одну переменную в качестве данных, необходимых для следующей процедуры. Для подключения каждого было 5 объектов multiprocess.Queue (избыточный в конце для единообразия поведения функции). [*]Каждая функция окружена простым демоном, который (то, что я собираюсь сделать) циклически обрабатывает данные в очереди, передавая их следующей процедуре, одновременно прослушивая глобальный флаг, уведомляющий о том, что входная очередь пуста. связано с окончанием обработки или перегрузкой дальше по конвейеру. Код такой:
def ProcessWrapper(procedure, input_queue, output_queue, total_tasks, Done_queue): GlobalFlag = (done_queue.qsize() == total_tasks) пока не GlobalFlag: GlobalFlag = (done_queue.qsize() == total_tasks) пытаться: единица = input_queue.get() обратный вызов = процедура (единица) кроме очереди.Пусто: если не GlobalFlag: продолжать еще: перерыв пока правда: пытаться: output_queue.put(обратный вызов) кроме очереди.Полный: время.сон(60) продолжать перерыв input_queue.task_done() [*]Этот демон будет вызываться multiprocess.Pool в подпроцессах для выполнения различных заданий. Поскольку очередь ввода/вывода является итеративной, я использовал генератор для вызова pool.starmap(). Код следующий: # Получение списка объекта функции директория_процедуры = директория(Процедура.Синглтон) процедуры_функции = [ атрибут для атрибута в каталоге_процедуры если проверить.isfunction(getattr(Procedure.Singleton, атрибут)) ] # Начальный код Защитный тест(): менеджер = mp.Manager() # Создать очередь файлов и проанализировать очередь io_queue = { label: Manager.Queue() для метки в ["file_queue", "post_queue", "analyze_queue", "clean_queue", "done_queue"] } # Опущен код для помещения начальной задачи в file_queue. pbar.total = len(список_файлов) с mp.get_context("spawn").Pool(len(procedure_functions)) в качестве пула: пул.starmap( Процедура.ProcessWrapper, [ (процедура, io_queue[list(io_queue.keys())], io_queue[list(io_queue.keys())[i + 1]], len(file_list), io_queue[list(io_queue.keys()) [-1]]) # для i процедура в перечислении (procedure_functions[:-1]) ], ) Но после запуска код зависает в первом порожденном подпроцессе (он успешно породил 5 подпроцессов, что для меня абсурдно). Когда я получил трассировку из GDB, мне показалось, что код застрял в вызове waiter.acquire() в коде модуля. Я определенно сделал неправильно с флагом, должен был использовать фактическую блокировку, но я сомневаюсь, что это причина, поскольку код даже не превратился в первую функцию первого цикла (которая, по крайней мере, записала бы файл ).
Я знаю, что определенно сделал что-то не так, но не уверен, где именно, поскольку у меня нет никакого предыдущего опыта, кроме документа, предоставленного Python. Был бы признателен, если бы кто-нибудь мог проинструктировать меня, как устранять подобные проблемы, или указать, где у меня есть фундаментальное непонимание того, как работает параллелизм.
Я просмотрел ответы на другие актуальные вопросы, включая использование контекста spawn вместо fork и возможную утечку информации в with. > предложения. Но ни одно из предложенных ими решений не решило эту проблему.
Я писал параллельный рабочий процесс для небольшого тестового проекта, который включал следующий рабочий процесс:
[*]Существует четыре отдельные процедуры, каждая из которых принимает один параметр и возвращает одну переменную в качестве данных, необходимых для следующей процедуры. Для подключения каждого было 5 объектов multiprocess.Queue (избыточный в конце для единообразия поведения функции). [*]Каждая функция окружена простым демоном, который (то, что я собираюсь сделать) циклически обрабатывает данные в очереди, передавая их следующей процедуре, одновременно прослушивая глобальный флаг, уведомляющий о том, что входная очередь пуста. связано с окончанием обработки или перегрузкой дальше по конвейеру. Код такой:
def ProcessWrapper(procedure, input_queue, output_queue, total_tasks, Done_queue): GlobalFlag = (done_queue.qsize() == total_tasks) пока не GlobalFlag: GlobalFlag = (done_queue.qsize() == total_tasks) пытаться: единица = input_queue.get() обратный вызов = процедура (единица) кроме очереди.Пусто: если не GlobalFlag: продолжать еще: перерыв пока правда: пытаться: output_queue.put(обратный вызов) кроме очереди.Полный: время.сон(60) продолжать перерыв input_queue.task_done() [*]Этот демон будет вызываться multiprocess.Pool в подпроцессах для выполнения различных заданий. Поскольку очередь ввода/вывода является итеративной, я использовал генератор для вызова pool.starmap(). Код следующий: # Получение списка объекта функции директория_процедуры = директория(Процедура.Синглтон) процедуры_функции = [ атрибут для атрибута в каталоге_процедуры если проверить.isfunction(getattr(Procedure.Singleton, атрибут)) ] # Начальный код Защитный тест(): менеджер = mp.Manager() # Создать очередь файлов и проанализировать очередь io_queue = { label: Manager.Queue() для метки в ["file_queue", "post_queue", "analyze_queue", "clean_queue", "done_queue"] } # Опущен код для помещения начальной задачи в file_queue. pbar.total = len(список_файлов) с mp.get_context("spawn").Pool(len(procedure_functions)) в качестве пула: пул.starmap( Процедура.ProcessWrapper, [ (процедура, io_queue[list(io_queue.keys())], io_queue[list(io_queue.keys())[i + 1]], len(file_list), io_queue[list(io_queue.keys()) [-1]]) # для i процедура в перечислении (procedure_functions[:-1]) ], ) Но после запуска код зависает в первом порожденном подпроцессе (он успешно породил 5 подпроцессов, что для меня абсурдно). Когда я получил трассировку из GDB, мне показалось, что код застрял в вызове waiter.acquire() в коде модуля. Я определенно сделал неправильно с флагом, должен был использовать фактическую блокировку, но я сомневаюсь, что это причина, поскольку код даже не превратился в первую функцию первого цикла (которая, по крайней мере, записала бы файл ).
Я знаю, что определенно сделал что-то не так, но не уверен, где именно, поскольку у меня нет никакого предыдущего опыта, кроме документа, предоставленного Python. Был бы признателен, если бы кто-нибудь мог проинструктировать меня, как устранять подобные проблемы, или указать, где у меня есть фундаментальное непонимание того, как работает параллелизм.
Я просмотрел ответы на другие актуальные вопросы, включая использование контекста spawn вместо fork и возможную утечку информации в with. > предложения. Но ни одно из предложенных ими решений не решило эту проблему.
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Ошибка VSCode: «Команда Python привела к ошибке (команда Python.setInterpreter не найдена)
Anonymous » » в форуме Python - 0 Ответы
- 41 Просмотры
-
Последнее сообщение Anonymous
-