Многопроцессорность Python: неправильная синхронизация привела к зависанию подпроцесса в waiter.acquire()Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Многопроцессорность Python: неправильная синхронизация привела к зависанию подпроцесса в waiter.acquire()

Сообщение Anonymous »


Я писал параллельный рабочий процесс для небольшого тестового проекта, который включал следующий рабочий процесс:
[*]Существует четыре отдельные процедуры, каждая из которых принимает один параметр и возвращает одну переменную в качестве данных, необходимых для следующей процедуры. Для подключения каждого было 5 объектов multiprocess.Queue (избыточный в конце для единообразия поведения функции). [*]Каждая функция окружена простым демоном, который (то, что я собираюсь сделать) циклически обрабатывает данные в очереди, передавая их следующей процедуре, одновременно прослушивая глобальный флаг, уведомляющий о том, что входная очередь пуста. связано с окончанием обработки или перегрузкой дальше по конвейеру. Код такой:
def ProcessWrapper(procedure, input_queue, output_queue, total_tasks, Done_queue): GlobalFlag = (done_queue.qsize() == total_tasks) пока не GlobalFlag: GlobalFlag = (done_queue.qsize() == total_tasks) пытаться: единица = input_queue.get() обратный вызов = процедура (единица) кроме очереди.Пусто: если не GlobalFlag: продолжать еще: перерыв пока правда: пытаться: output_queue.put(обратный вызов) кроме очереди.Полный: время.сон(60) продолжать перерыв input_queue.task_done() [*]Этот демон будет вызываться multiprocess.Pool в подпроцессах для выполнения различных заданий. Поскольку очередь ввода/вывода является итеративной, я использовал генератор для вызова pool.starmap(). Код следующий: # Получение списка объекта функции директория_процедуры = директория(Процедура.Синглтон) процедуры_функции = [ атрибут для атрибута в каталоге_процедуры если проверить.isfunction(getattr(Procedure.Singleton, атрибут)) ] # Начальный код Защитный тест(): менеджер = mp.Manager() # Создать очередь файлов и проанализировать очередь io_queue = { label: Manager.Queue() для метки в ["file_queue", "post_queue", "analyze_queue", "clean_queue", "done_queue"] } # Опущен код для помещения начальной задачи в file_queue. pbar.total = len(список_файлов) с mp.get_context("spawn").Pool(len(procedure_functions)) в качестве пула: пул.starmap( Процедура.ProcessWrapper, [ (процедура, io_queue[list(io_queue.keys())], io_queue[list(io_queue.keys())[i + 1]], len(file_list), io_queue[list(io_queue.keys()) [-1]]) # для i процедура в перечислении (procedure_functions[:-1]) ], ) Но после запуска код зависает в первом порожденном подпроцессе (он успешно породил 5 подпроцессов, что для меня абсурдно). Когда я получил трассировку из GDB, мне показалось, что код застрял в вызове waiter.acquire() в коде модуля. Я определенно сделал неправильно с флагом, должен был использовать фактическую блокировку, но я сомневаюсь, что это причина, поскольку код даже не превратился в первую функцию первого цикла (которая, по крайней мере, записала бы файл ).

Я знаю, что определенно сделал что-то не так, но не уверен, где именно, поскольку у меня нет никакого предыдущего опыта, кроме документа, предоставленного Python. Был бы признателен, если бы кто-нибудь мог проинструктировать меня, как устранять подобные проблемы, или указать, где у меня есть фундаментальное непонимание того, как работает параллелизм.

Я просмотрел ответы на другие актуальные вопросы, включая использование контекста spawn вместо fork и возможную утечку информации в with. > предложения. Но ни одно из предложенных ими решений не решило эту проблему.
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Каково истинное значение Release-Acquire в C++?
    Anonymous » » в форуме C++
    0 Ответы
    75 Просмотры
    Последнее сообщение Anonymous
  • Что делает Memory_order::acquire для операции чтения-изменения-записи?
    Anonymous » » в форуме C++
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Что делает Memory_order::acquire для операции чтения-изменения-записи?
    Anonymous » » в форуме C++
    0 Ответы
    21 Просмотры
    Последнее сообщение Anonymous
  • Asyncpg, когда использовать, Получите новое соединение с pool.acquire vs Поделиться одним соединением
    Anonymous » » в форуме Python
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Ошибка VSCode: «Команда Python привела к ошибке (команда Python.setInterpreter не найдена)
    Anonymous » » в форуме Python
    0 Ответы
    41 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»