Правильный способ многопроцессорной обработки в приложении PySide6 [закрыто]Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Правильный способ многопроцессорной обработки в приложении PySide6 [закрыто]

Сообщение Anonymous »

Я пишу приложение с графическим интерфейсом PySide6, в котором планирую реализовать модуль обработки (ProcessingModule) с переменным количеством выделенных процессов.

Модуль будет принимать информацию о требуемой функции и ее аргументах для выполнения.

Я намеревался использовать многопроцессорную библиотеку для этого случая.
Я пытаюсь достичь следующих ключевых моментов:
  • Поскольку в ProcessingModule не ожидается слишком много доступных процессов, ожидающие запросы будут храниться в стеке pending_tasks и выводиться наружу, как только некоторые из рабочих завершатся. >
  • В идеале класс Worker должен быть унифицирован для каждого типа функций и аргументов, которые он должен выполнять.
  • Я хотел бы передать результат Worker несколькими способами (или разными сигналами) в зависимости от того, какую функцию выполнил Worker.
  • Я также не хочу использовать таймеры для извлечения данных из pending_tasks, поскольку это приведет к ненужной задержке (более подходящим будет прямой обратный вызов, отправленный Worker по завершении)
Основная проблема:
Проблема, с которой я столкнулся, заключается в следующем: что мое приложение использует объекты PySide6.QtCore, которые невозможно выбрать, поэтому они не работают с созданием экземпляра multiprocessing.Process.

Хотя для ProcessingModule Чтобы обрабатывать входящие данные для сериализации, у меня возникли проблемы с выяснением того, как гибко обрабатывать выходные данные Worker.
Попытки решения:
  • Один из подходов, предложенных ChatGPT, заключался в создании стека ответов, который будет очищаться ProcessingModule и передаваться другим модулям.

    Однако этот вариант мне не нравится из-за набора функций, необходимых для гибкой обработки каждого типа ответа.
  • Другой вариант было бы определить класс Worker в отдельном файле, который будет запускаться QProcess. Изначально я хотел избежать создания дополнительных модулей, так как ожидал, что эта задача будет проще. Однако теперь этот подход начинает выглядеть наиболее подходящим.
Это моя первая попытка такого рода разработка приложений.

Я был бы благодарен за любой намек или, возможно, ссылку на подобные концепции.
Я бы предоставил оба кода, которые работают, но не используют QObjects
Я бы предоставил оба кода, которые работают, но не используют QObjects
code> (для аппроксимации желаемого решения) и код, который их содержит.
Пример решения:
import logging
import random
import sys
import time
from multiprocessing import Process
from typing import Callable, List

from PySide6.QtCore import QObject, QCoreApplication, Signal, QTimer

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Signals(QObject):
"""A class to define shared signals."""
worker_done = Signal(object)

class Worker:
def __init__(self, method: Callable, arguments: dict):
self.method = method
self.arguments = arguments

def run(self):
worker_name = f"Worker "
logging.info(f"{worker_name}: started.")
self.method(**self.arguments)
logging.info(f"{worker_name}: finished.")

class WorkerManager:
def __init__(self, process_count=5):
super().__init__()

self.name = f"WorkerManager"
self.max_processes = process_count
self.running_processes: List[Process] = []
self.pending_tasks: List[(str, int, int)] = []

# Timer to periodically check for completed processes
self.timer = QTimer()
self.timer.timeout.connect(self._check_running_processes)
self.timer.start(100) # Check every 100 ms

def run_workers(self, pending_tasks):
self.pending_tasks.extend(pending_tasks)
self._assign_tasks()

def _assign_tasks(self):
while len(self.running_processes) < self.max_processes and self.pending_tasks:
operation, num_a, num_b = self.pending_tasks.pop(0)

match(operation):
case '+':
method = add
case '-':
method = sub

worker = Worker(method=method, arguments={"num_a": num_a, "num_b": num_b})

worker_process = Process(target=worker.run, name=f"Worker_A{operation}B:")
worker_process.start()

self.running_processes.append(worker_process)
logging.info(f"{self.name}: Started Worker_A{operation}B.")

def _check_running_processes(self):
# Check for completed processes
for process in self.running_processes[:]: # Copy list to allow modification
if not process.is_alive(): # Process has finished
process.join() # Clean up the process
self.running_processes.remove(process)
logging.info(f"{self.name}: Worker process completed and cleaned up.")

# Assign more tasks if there are available slots
self._assign_tasks()

# Exit application if all tasks and processes are complete
if not self.pending_tasks and not self.running_processes:
logging.info(f"{self.name}: All tasks completed. Exiting application.")
self.timer.stop()
QCoreApplication.quit()

def add(num_a, num_b):
time.sleep(1)
logger.info(f"{num_a}+{num_b}={num_a+num_b}")

def sub(num_a, num_b):
time.sleep(1)
logger.info(f"{num_a}-{num_b}={num_a-num_b}")

if __name__ == "__main__":
app = QCoreApplication(sys.argv)

signals = Signals()
manager = WorkerManager(process_count=5)

length = 50
op_list = random.choices(["+", "-"], k=length)
a_list = random.choices(range(1, 50), k=length)
b_list = random.choices(range(1, 50), k=length)

arguments = [(op, a, b) for op, a, b in zip(op_list, a_list, b_list)]
manager.run_workers(arguments)

sys.exit(app.exec())

Код, содержащий объекты QObject:
import logging
import random
import sys
import time
from multiprocessing import Process
from typing import Callable, List

from PySide6.QtCore import QObject, QCoreApplication, Signal, QTimer

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Signals(QObject):
"""A class to define shared signals."""
log_signal = Signal(str)
worker_done = Signal(object)

class Worker:
def __init__(self, method: Callable, arguments: dict):
self.method = method
self.arguments = arguments

def run(self):
worker_name = f"Worker "
logging.info(f"{worker_name}: started.")
self.method(**self.arguments)
logging.info(f"{worker_name}: finished.")

class WorkerManager:
def __init__(self, signals, process_count=5):
super().__init__()

self.name = f"WorkerManager"
self.max_processes = process_count
self.signals = signals
self.running_processes: List[Process] = []
self.pending_tasks: List[(str, int, int)] = []

self.signals.worker_done.connect(self._check_running_processes())

def run_workers(self, pending_tasks):
self.pending_tasks.extend(pending_tasks)
self._assign_tasks()

def _assign_tasks(self):
while len(self.running_processes) < self.max_processes and self.pending_tasks:
operation, num_a, num_b = self.pending_tasks.pop(0)

match(operation):
case '+':
method = self.add
case '-':
method = self.sub

worker = Worker(method=method, arguments={"num_a": num_a, "num_b": num_b})

worker_process = Process(target=worker.run, name=f"Worker_A{operation}B:")
worker_process.start()

self.running_processes.append(worker_process)
logging.info(f"{self.name}: Started Worker_A{operation}B.")

def _check_running_processes(self):
for process in self.running_processes[:]:
if not process.is_alive():
process.join()
self.running_processes.remove(process)
logging.info(f"{self.name}: Worker process completed and cleaned up.")

self._assign_tasks()

if not self.pending_tasks and not self.running_processes:
logging.info(f"{self.name}: All tasks completed. Exiting application.")
QCoreApplication.quit()

def add(self, num_a, num_b):
time.sleep(1)
logger.info(f"A+B={num_a+num_b}")

def sub(self, num_a, num_b):
time.sleep(1)
logger.info(f"A-B={num_a-num_b}")

def handle_add_result(self, result):
out = f"Result of A+B={result}."
logger.info(out)
self.signals.log_signal.emit(out)

def handle_sub_result(self, result):
out = f"Result of A-B={result}."
logger.info(out)
self.signals.log_signal.emit(out)

class LoggerModule(QObject):
def __init__(self, signals):
super().__init__()
self.signals = signals
self.signals.log_signal.connect(self.log_func)

def log_func(self, msg):
logger.info(msg)

if __name__ == "__main__":
app = QCoreApplication(sys.argv)

signals = Signals()
logger_module = LoggerModule(signals=signals)
manager = WorkerManager(signals=signals, process_count=5)

length = 50
op_list = random.choices(["+", "-"], k=length)
a_list = random.choices(range(1, 50), k=length)
b_list = random.choices(range(1, 50), k=length)

arguments = [(op, a, b) for op, a, b in zip(op_list, a_list, b_list)]
manager.run_workers(arguments)

sys.exit(app.exec())



Подробнее здесь: https://stackoverflow.com/questions/791 ... yside6-app
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Правильный способ многопроцессорной обработки в приложении PySide6
    Anonymous » » в форуме Python
    0 Ответы
    29 Просмотры
    Последнее сообщение Anonymous
  • Повышает ли нарезка кадров данных в многопроцессорной/многопроцессорной обработке производительность?
    Anonymous » » в форуме Python
    0 Ответы
    32 Просмотры
    Последнее сообщение Anonymous
  • Правильный способ запуска QProcess в приложении PySide6
    Anonymous » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Правильный способ запуска QProcess в приложении PySide6 [дубликат]
    Anonymous » » в форуме Python
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous
  • Есть ли способ завершить процесс отслеживания ресурсов многопроцессорной обработки Python?
    Anonymous » » в форуме Python
    0 Ответы
    109 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»