Правильный способ многопроцессорной обработки в приложении PySide6Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Правильный способ многопроцессорной обработки в приложении PySide6

Сообщение Anonymous »

Я пишу приложение с графическим интерфейсом PySide6, в котором планирую реализовать модуль обработки (ProcessingModule) с переменным количеством выделенных процессов.
Модуль будет принимать информацию о требуемой функции и ее аргументах для выполнения.
В этом случае я намеревался использовать многопроцессорную библиотеку.
Я пытаюсь достичь следующих ключевых моментов:
  • Как ProcessingModule не ожидается, что будет слишком много доступных процессов, ожидающие запросы будут сохранены в pending_tasks складываются и выскакивают, как только некоторые из Workers завершаются.
  • В идеале класс Worker должен быть унифицирован для каждого тип функции и аргументы, которые она должна выполнять.
  • Я хотел бы выдавать результат Worker несколькими способами (или разными сигналами), в зависимости от того, какой функция Worker выполнена.
  • Я также не хочу использовать таймеры для извлечения данных из pending_tasks, так как это приведет к ненужной задержке (более подходящим будет прямой обратный вызов, отправленный Worker по завершении)
Основная проблема:
Проблема, с которой я столкнулся, заключается в том, что мой приложение использует объекты PySide6.QtCore, которые не являются Pickelable, поэтому не работает с созданием экземпляра multiprocessing.Process.
Хотя ProcessingModule может обрабатывать входящие данные для сериализации, у меня есть проблема с выяснением того, как гибко обрабатывать Worker< /code> вывод.
Попытки решения:
  • Один из подходов предложение ChatGPT заключалось в создании стек ответов, который будет очищен ProcessingModule и отправлен другим модулям.
    Однако мне не нравится этот вариант из-за набора функций, необходимых для гибкой обработки каждого типа ответа.
  • Следующим вариантом будет определение класса Worker в отдельном файле, который будет запускаться QProcess. Изначально я хотел избежать создания дополнительных модулей, так как ожидал, что эта задача будет проще. Однако сейчас этот подход кажется наиболее подходящим.
Это моя первая попытка разработки такого рода приложений. .
Я был бы благодарен за любой намек или, возможно, ссылку на подобные концепции.
Я бы предоставил оба кода, которые работают, но не используют QObjects (чтобы приблизить желаемое решение) и код, который их содержит.
Пример решение:
import logging
import random
import sys
import time
from multiprocessing import Process
from typing import Callable, List

from PySide6.QtCore import QObject, QCoreApplication, Signal, QTimer

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Signals(QObject):
"""A class to define shared signals."""
worker_done = Signal(object)

class Worker:
def __init__(self, method: Callable, arguments: dict):
self.method = method
self.arguments = arguments

def run(self):
worker_name = f"Worker "
logging.info(f"{worker_name}: started.")
self.method(**self.arguments)
logging.info(f"{worker_name}: finished.")

class WorkerManager:
def __init__(self, process_count=5):
super().__init__()

self.name = f"WorkerManager"
self.max_processes = process_count
self.running_processes: List[Process] = []
self.pending_tasks: List[(str, int, int)] = []

# Timer to periodically check for completed processes
self.timer = QTimer()
self.timer.timeout.connect(self._check_running_processes)
self.timer.start(100) # Check every 100 ms

def run_workers(self, pending_tasks):
self.pending_tasks.extend(pending_tasks)
self._assign_tasks()

def _assign_tasks(self):
while len(self.running_processes) < self.max_processes and self.pending_tasks:
operation, num_a, num_b = self.pending_tasks.pop(0)

match(operation):
case '+':
method = add
case '-':
method = sub

worker = Worker(method=method, arguments={"num_a": num_a, "num_b": num_b})

worker_process = Process(target=worker.run, name=f"Worker_A{operation}B:")
worker_process.start()

self.running_processes.append(worker_process)
logging.info(f"{self.name}: Started Worker_A{operation}B.")

def _check_running_processes(self):
# Check for completed processes
for process in self.running_processes[:]: # Copy list to allow modification
if not process.is_alive(): # Process has finished
process.join() # Clean up the process
self.running_processes.remove(process)
logging.info(f"{self.name}: Worker process completed and cleaned up.")

# Assign more tasks if there are available slots
self._assign_tasks()

# Exit application if all tasks and processes are complete
if not self.pending_tasks and not self.running_processes:
logging.info(f"{self.name}: All tasks completed. Exiting application.")
self.timer.stop()
QCoreApplication.quit()

def add(num_a, num_b):
time.sleep(1)
logger.info(f"{num_a}+{num_b}={num_a+num_b}")

def sub(num_a, num_b):
time.sleep(1)
logger.info(f"{num_a}-{num_b}={num_a-num_b}")

if __name__ == "__main__":
app = QCoreApplication(sys.argv)

signals = Signals()
manager = WorkerManager(process_count=5)

length = 50
op_list = random.choices(["+", "-"], k=length)
a_list = random.choices(range(1, 50), k=length)
b_list = random.choices(range(1, 50), k=length)

arguments = [(op, a, b) for op, a, b in zip(op_list, a_list, b_list)]
manager.run_workers(arguments)

sys.exit(app.exec())

Код, содержащий объекты QObject:
import logging
import random
import sys
import time
from multiprocessing import Process
from typing import Callable, List

from PySide6.QtCore import QObject, QCoreApplication, Signal, QTimer

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Signals(QObject):
"""A class to define shared signals."""
log_signal = Signal(str)
worker_done = Signal(object)

class Worker:
def __init__(self, method: Callable, arguments: dict):
self.method = method
self.arguments = arguments

def run(self):
worker_name = f"Worker "
logging.info(f"{worker_name}: started.")
self.method(**self.arguments)
logging.info(f"{worker_name}: finished.")

class WorkerManager:
def __init__(self, signals, process_count=5):
super().__init__()

self.name = f"WorkerManager"
self.max_processes = process_count
self.signals = signals
self.running_processes: List[Process] = []
self.pending_tasks: List[(str, int, int)] = []

self.signals.worker_done.connect(self._check_running_processes())

def run_workers(self, pending_tasks):
self.pending_tasks.extend(pending_tasks)
self._assign_tasks()

def _assign_tasks(self):
while len(self.running_processes) < self.max_processes and self.pending_tasks:
operation, num_a, num_b = self.pending_tasks.pop(0)

match(operation):
case '+':
method = self.add
case '-':
method = self.sub

worker = Worker(method=method, arguments={"num_a": num_a, "num_b": num_b})

worker_process = Process(target=worker.run, name=f"Worker_A{operation}B:")
worker_process.start()

self.running_processes.append(worker_process)
logging.info(f"{self.name}: Started Worker_A{operation}B.")

def _check_running_processes(self):
for process in self.running_processes[:]:
if not process.is_alive():
process.join()
self.running_processes.remove(process)
logging.info(f"{self.name}: Worker process completed and cleaned up.")

self._assign_tasks()

if not self.pending_tasks and not self.running_processes:
logging.info(f"{self.name}: All tasks completed. Exiting application.")
QCoreApplication.quit()

def add(self, num_a, num_b):
time.sleep(1)
logger.info(f"A+B={num_a+num_b}")

def sub(self, num_a, num_b):
time.sleep(1)
logger.info(f"A-B={num_a-num_b}")

def handle_add_result(self, result):
out = f"Result of A+B={result}."
logger.info(out)
self.signals.log_signal.emit(out)

def handle_sub_result(self, result):
out = f"Result of A-B={result}."
logger.info(out)
self.signals.log_signal.emit(out)

class LoggerModule(QObject):
def __init__(self, signals):
super().__init__()
self.signals = signals
self.signals.log_signal.connect(self.log_func)

def log_func(self, msg):
logger.info(msg)

if __name__ == "__main__":
app = QCoreApplication(sys.argv)

signals = Signals()
logger_module = LoggerModule(signals=signals)
manager = WorkerManager(signals=signals, process_count=5)

length = 50
op_list = random.choices(["+", "-"], k=length)
a_list = random.choices(range(1, 50), k=length)
b_list = random.choices(range(1, 50), k=length)

arguments = [(op, a, b) for op, a, b in zip(op_list, a_list, b_list)]
manager.run_workers(arguments)

sys.exit(app.exec())



Подробнее здесь: https://stackoverflow.com/questions/791 ... yside6-app
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»