Модуль будет принимать информацию о требуемой функции и ее аргументах для выполнения.
В этом случае я намеревался использовать многопроцессорную библиотеку.
Я пытаюсь достичь следующих ключевых моментов:
- Как ProcessingModule не ожидается, что будет слишком много доступных процессов, ожидающие запросы будут сохранены в pending_tasks складываются и выскакивают, как только некоторые из Workers завершаются.
- В идеале класс Worker должен быть унифицирован для каждого тип функции и аргументы, которые она должна выполнять.
- Я хотел бы выдавать результат Worker несколькими способами (или разными сигналами), в зависимости от того, какой функция Worker выполнена.
- Я также не хочу использовать таймеры для извлечения данных из pending_tasks, так как это приведет к ненужной задержке (более подходящим будет прямой обратный вызов, отправленный Worker по завершении)
Проблема, с которой я столкнулся, заключается в том, что мой приложение использует объекты PySide6.QtCore, которые не являются Pickelable, поэтому не работает с созданием экземпляра multiprocessing.Process.
Хотя ProcessingModule может обрабатывать входящие данные для сериализации, у меня есть проблема с выяснением того, как гибко обрабатывать Worker< /code> вывод.
Попытки решения:
- Один из подходов предложение ChatGPT заключалось в создании стек ответов, который будет очищен ProcessingModule и отправлен другим модулям.
Однако мне не нравится этот вариант из-за набора функций, необходимых для гибкой обработки каждого типа ответа. - Следующим вариантом будет определение класса Worker в отдельном файле, который будет запускаться QProcess. Изначально я хотел избежать создания дополнительных модулей, так как ожидал, что эта задача будет проще. Однако сейчас этот подход кажется наиболее подходящим.
Я был бы благодарен за любой намек или, возможно, ссылку на подобные концепции.
Я бы предоставил оба кода, которые работают, но не используют QObjects (чтобы приблизить желаемое решение) и код, который их содержит.
Пример решение:
import logging
import random
import sys
import time
from multiprocessing import Process
from typing import Callable, List
from PySide6.QtCore import QObject, QCoreApplication, Signal, QTimer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Signals(QObject):
"""A class to define shared signals."""
worker_done = Signal(object)
class Worker:
def __init__(self, method: Callable, arguments: dict):
self.method = method
self.arguments = arguments
def run(self):
worker_name = f"Worker "
logging.info(f"{worker_name}: started.")
self.method(**self.arguments)
logging.info(f"{worker_name}: finished.")
class WorkerManager:
def __init__(self, process_count=5):
super().__init__()
self.name = f"WorkerManager"
self.max_processes = process_count
self.running_processes: List[Process] = []
self.pending_tasks: List[(str, int, int)] = []
# Timer to periodically check for completed processes
self.timer = QTimer()
self.timer.timeout.connect(self._check_running_processes)
self.timer.start(100) # Check every 100 ms
def run_workers(self, pending_tasks):
self.pending_tasks.extend(pending_tasks)
self._assign_tasks()
def _assign_tasks(self):
while len(self.running_processes) < self.max_processes and self.pending_tasks:
operation, num_a, num_b = self.pending_tasks.pop(0)
match(operation):
case '+':
method = add
case '-':
method = sub
worker = Worker(method=method, arguments={"num_a": num_a, "num_b": num_b})
worker_process = Process(target=worker.run, name=f"Worker_A{operation}B:")
worker_process.start()
self.running_processes.append(worker_process)
logging.info(f"{self.name}: Started Worker_A{operation}B.")
def _check_running_processes(self):
# Check for completed processes
for process in self.running_processes[:]: # Copy list to allow modification
if not process.is_alive(): # Process has finished
process.join() # Clean up the process
self.running_processes.remove(process)
logging.info(f"{self.name}: Worker process completed and cleaned up.")
# Assign more tasks if there are available slots
self._assign_tasks()
# Exit application if all tasks and processes are complete
if not self.pending_tasks and not self.running_processes:
logging.info(f"{self.name}: All tasks completed. Exiting application.")
self.timer.stop()
QCoreApplication.quit()
def add(num_a, num_b):
time.sleep(1)
logger.info(f"{num_a}+{num_b}={num_a+num_b}")
def sub(num_a, num_b):
time.sleep(1)
logger.info(f"{num_a}-{num_b}={num_a-num_b}")
if __name__ == "__main__":
app = QCoreApplication(sys.argv)
signals = Signals()
manager = WorkerManager(process_count=5)
length = 50
op_list = random.choices(["+", "-"], k=length)
a_list = random.choices(range(1, 50), k=length)
b_list = random.choices(range(1, 50), k=length)
arguments = [(op, a, b) for op, a, b in zip(op_list, a_list, b_list)]
manager.run_workers(arguments)
sys.exit(app.exec())
Код, содержащий объекты QObject:
import logging
import random
import sys
import time
from multiprocessing import Process
from typing import Callable, List
from PySide6.QtCore import QObject, QCoreApplication, Signal, QTimer
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class Signals(QObject):
"""A class to define shared signals."""
log_signal = Signal(str)
worker_done = Signal(object)
class Worker:
def __init__(self, method: Callable, arguments: dict):
self.method = method
self.arguments = arguments
def run(self):
worker_name = f"Worker "
logging.info(f"{worker_name}: started.")
self.method(**self.arguments)
logging.info(f"{worker_name}: finished.")
class WorkerManager:
def __init__(self, signals, process_count=5):
super().__init__()
self.name = f"WorkerManager"
self.max_processes = process_count
self.signals = signals
self.running_processes: List[Process] = []
self.pending_tasks: List[(str, int, int)] = []
self.signals.worker_done.connect(self._check_running_processes())
def run_workers(self, pending_tasks):
self.pending_tasks.extend(pending_tasks)
self._assign_tasks()
def _assign_tasks(self):
while len(self.running_processes) < self.max_processes and self.pending_tasks:
operation, num_a, num_b = self.pending_tasks.pop(0)
match(operation):
case '+':
method = self.add
case '-':
method = self.sub
worker = Worker(method=method, arguments={"num_a": num_a, "num_b": num_b})
worker_process = Process(target=worker.run, name=f"Worker_A{operation}B:")
worker_process.start()
self.running_processes.append(worker_process)
logging.info(f"{self.name}: Started Worker_A{operation}B.")
def _check_running_processes(self):
for process in self.running_processes[:]:
if not process.is_alive():
process.join()
self.running_processes.remove(process)
logging.info(f"{self.name}: Worker process completed and cleaned up.")
self._assign_tasks()
if not self.pending_tasks and not self.running_processes:
logging.info(f"{self.name}: All tasks completed. Exiting application.")
QCoreApplication.quit()
def add(self, num_a, num_b):
time.sleep(1)
logger.info(f"A+B={num_a+num_b}")
def sub(self, num_a, num_b):
time.sleep(1)
logger.info(f"A-B={num_a-num_b}")
def handle_add_result(self, result):
out = f"Result of A+B={result}."
logger.info(out)
self.signals.log_signal.emit(out)
def handle_sub_result(self, result):
out = f"Result of A-B={result}."
logger.info(out)
self.signals.log_signal.emit(out)
class LoggerModule(QObject):
def __init__(self, signals):
super().__init__()
self.signals = signals
self.signals.log_signal.connect(self.log_func)
def log_func(self, msg):
logger.info(msg)
if __name__ == "__main__":
app = QCoreApplication(sys.argv)
signals = Signals()
logger_module = LoggerModule(signals=signals)
manager = WorkerManager(signals=signals, process_count=5)
length = 50
op_list = random.choices(["+", "-"], k=length)
a_list = random.choices(range(1, 50), k=length)
b_list = random.choices(range(1, 50), k=length)
arguments = [(op, a, b) for op, a, b in zip(op_list, a_list, b_list)]
manager.run_workers(arguments)
sys.exit(app.exec())
Подробнее здесь: https://stackoverflow.com/questions/791 ... yside6-app