Как правильно использовать многопроцессорность Python в классе? ⇐ Python
Как правильно использовать многопроцессорность Python в классе?
В настоящее время я работаю над приложением Python FastAPI, которое создает циклы занятости в параллельных потоках с использованием библиотеки многопроцессорности. Когда я впервые начал создавать приложение, я создал потоки во время запуска в файле main.py. Это сработало отлично, и я смог без проблем создать все нужные мне темы.
В конце концов мне понадобились определенные конечные точки FastAPI, чтобы иметь возможность контролировать эти потоки, поэтому вместо того, чтобы создавать угрозы при запуске приложения, я создал одноэлементный файл класса, который мог бы управлять потоками и к которому мог бы получить доступ как процесс запуска, так и FastAPI. конечные точки.
Проблема в том, что как только я переместил метод Process из файла main.py в файл .core.thread_utils.py, я теперь получаю следующую ошибку при создании более одного потока:
... Файл «/home/user/Projects/test/test-api/api/core/thread_utils.py», строка 66, в launch_all_threads п.старт() Файл "/usr/lib/python3.8/multiprocessing/process.py", строка 121, в начале self._popen = self._Popen(сам) Файл «/usr/lib/python3.8/multiprocessing/context.py», строка 224, в _Popen return _default_context.get_context().Process._Popen(process_obj) Файл «/usr/lib/python3.8/multiprocessing/context.py», строка 284, в _Popen вернуть Popen(process_obj) Файл «/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py», строка 32, в __init__ супер().__init__(process_obj) Файл «/usr/lib/python3.8/multiprocessing/popen_fork.py», строка 19, в __init__ self._launch(process_obj) Файл «/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py», строка 47, в _launch сокращение.dump(process_obj, fp) Файл "/usr/lib/python3.8/multiprocessing/reduction.py", строка 60, в дампе ForkingPickler(файл, протокол).dump(obj) TypeError: невозможно выбрать объект «weakref»
Итак, код, который нормально работал в main.py, как только его выносят в отдельный класс, теперь выдает ошибку. Кроме того, я могу создать один поток без каких-либо проблем, но более одного потока выдает ошибку TypeError «невозможно выбрать». Я не понимаю почему.
После некоторого исследования того, как Python создает новые потоки, я считаю, что столкнулся с этой ошибкой. Кроме того, в документации по многопроцессорной обработке упоминается «безопасный импорт», но я не уверен, применимо ли это к этой ситуации и как это применимо, поскольку попытки применить это приводят к той же ошибке или полному отсутствию потоков. Возможно, я делаю это неправильно...
Код, который я использую, выглядит примерно так:
main.py:
из .core.thread_utils импортируйте thread_utils ... @app.on_event("startup") # это часть FastAPI асинхронная защита start_event(): # предыдущее создание потока было здесь и работало отлично thread_utils.launch_all_threads() core/thread_utils.py:
из многопроцессорного процесса импорта из ..core импорта busy_loop класс thread_utils: _self = Нет защита __new__(cls): если cls._self равен None: cls._self = super().__new__(cls) вернуть cls._self def launch_runner_thread(self, t:int): busy_loop (т) защита launch_all_threads (сам): t_list = [1,2] для t в t_list: p = Процесс(target=self.launch_runner_thread, args=(t,)) п.старт() Любая помощь, указания или рабочие примеры реализации многопроцессорной обработки в отдельном классе будут очень признательны!
В настоящее время я работаю над приложением Python FastAPI, которое создает циклы занятости в параллельных потоках с использованием библиотеки многопроцессорности. Когда я впервые начал создавать приложение, я создал потоки во время запуска в файле main.py. Это сработало отлично, и я смог без проблем создать все нужные мне темы.
В конце концов мне понадобились определенные конечные точки FastAPI, чтобы иметь возможность контролировать эти потоки, поэтому вместо того, чтобы создавать угрозы при запуске приложения, я создал одноэлементный файл класса, который мог бы управлять потоками и к которому мог бы получить доступ как процесс запуска, так и FastAPI. конечные точки.
Проблема в том, что как только я переместил метод Process из файла main.py в файл .core.thread_utils.py, я теперь получаю следующую ошибку при создании более одного потока:
... Файл «/home/user/Projects/test/test-api/api/core/thread_utils.py», строка 66, в launch_all_threads п.старт() Файл "/usr/lib/python3.8/multiprocessing/process.py", строка 121, в начале self._popen = self._Popen(сам) Файл «/usr/lib/python3.8/multiprocessing/context.py», строка 224, в _Popen return _default_context.get_context().Process._Popen(process_obj) Файл «/usr/lib/python3.8/multiprocessing/context.py», строка 284, в _Popen вернуть Popen(process_obj) Файл «/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py», строка 32, в __init__ супер().__init__(process_obj) Файл «/usr/lib/python3.8/multiprocessing/popen_fork.py», строка 19, в __init__ self._launch(process_obj) Файл «/usr/lib/python3.8/multiprocessing/popen_spawn_posix.py», строка 47, в _launch сокращение.dump(process_obj, fp) Файл "/usr/lib/python3.8/multiprocessing/reduction.py", строка 60, в дампе ForkingPickler(файл, протокол).dump(obj) TypeError: невозможно выбрать объект «weakref»
Итак, код, который нормально работал в main.py, как только его выносят в отдельный класс, теперь выдает ошибку. Кроме того, я могу создать один поток без каких-либо проблем, но более одного потока выдает ошибку TypeError «невозможно выбрать». Я не понимаю почему.
После некоторого исследования того, как Python создает новые потоки, я считаю, что столкнулся с этой ошибкой. Кроме того, в документации по многопроцессорной обработке упоминается «безопасный импорт», но я не уверен, применимо ли это к этой ситуации и как это применимо, поскольку попытки применить это приводят к той же ошибке или полному отсутствию потоков. Возможно, я делаю это неправильно...
Код, который я использую, выглядит примерно так:
main.py:
из .core.thread_utils импортируйте thread_utils ... @app.on_event("startup") # это часть FastAPI асинхронная защита start_event(): # предыдущее создание потока было здесь и работало отлично thread_utils.launch_all_threads() core/thread_utils.py:
из многопроцессорного процесса импорта из ..core импорта busy_loop класс thread_utils: _self = Нет защита __new__(cls): если cls._self равен None: cls._self = super().__new__(cls) вернуть cls._self def launch_runner_thread(self, t:int): busy_loop (т) защита launch_all_threads (сам): t_list = [1,2] для t в t_list: p = Процесс(target=self.launch_runner_thread, args=(t,)) п.старт() Любая помощь, указания или рабочие примеры реализации многопроцессорной обработки в отдельном классе будут очень признательны!
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение