Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные из процесса готовы, он отправляет данные в QThread через многопроцессорный канал, а затем QThread генерирует pyqtSignal, который реализован в родительском классе (pyqt5).
Во многих случаях, когда я пытаюсь отправить данные в QThread (в большинстве случаев), я сталкиваюсь с этой ошибкой:
Код: Выделить всё
Traceback (most recent call last):
File "C:\Users\cpapp\Documents\My Projects\web-radio-studio\src\python+\main-window\page-4\player-list.py", line 3601, in get_player_list
self.to_emitter.send({"type":"fetch-result","result":self.player_list_items})
File "C:\Python\lib\multiprocessing\connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\lib\multiprocessing\connection.py", line 295, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109] The pipe has been ended
Как ее избежать и как решить?
Пример кода :
main_ui.py
Код: Выделить всё
from PyQt5 import QtCore, QtGui, QtWidgets
import sys
import os
from multiprocessing import freeze_support, Queue
from PyQt5.QtCore import pyqtSignal, QThread, Qt
from multiprocessing import Process, Queue, Pipe
from main_ui import Ui_MainWindow
import time
class Example_1:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()
self.processes = 30
self.mother_pipes = []
self.child_pipes = []
self.queues = []
self.emitters = []
self.child_processes = []
for i in range(0,self.processes):
self.mother_pipes.append(None)
self.child_pipes.append(None)
self.mother_pipes[i], self.child_pipes[i] = Pipe()
self.queues.append(Queue())
self.emitters.append(Emitter(self.mother_pipes[i]))
self.emitters[i].signal_1.connect(self.signal_1_handler)
self.emitters[i].signal_2.connect(self.signal_2_handler)
self.emitters[i].signal_3.connect(self.signal_3_handler)
self.emitters[i].start()
self.child_processes.append(Child_Proc(self.child_pipes[i], self.queues[i]))
self.child_processes[i].start()
#put some data for test
for process in range(0,self.processes):
for i in range(0,100):
self.queues[process].put({"type":"signal_1","input":i})
for i in range(0,100):
self.queues[process].put({"type":"signal_2","input":i})
for i in range(0,100):
self.queues[process].put({"type":"signal_3","input":i})
self.MainWindow.closeEvent = lambda event: self.closeEvent(event)
sys.exit(self.app.exec_())
def signal_1_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
pass
def signal_2_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
pass
def signal_3_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
pass
def closeEvent(self,event):
for i in range(0,self.processes):
self.child_processes[i].terminate()
self.emitters[i].quit()
event.accept()
class Emitter(QThread):
signal_1 = pyqtSignal(int)
signal_2 = pyqtSignal(int)
signal_3 = pyqtSignal(int)
def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process
def run(self):
while True:
if self.data_from_process.poll():
data = self.data_from_process.recv()
else:
time.sleep(0.1)
continue
if data["type"]=="signal_1":
self.signal_1.emit(data["result"])
elif data["type"]=="signal_2":
self.signal_2.emit(data["result"])
elif data["type"]=="signal_3":
self.signal_3.emit(data["result"])
class Child_Proc(Process):
def __init__(self, to_emitter, from_mother):
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother
def run(self):
while(True):
data = self.data_from_mother.get()
if data["type"] == "signal_1":
result = self.signal_1_method(data["input"])
self.to_emitter.send({"type":"signal_1","result":result})
elif data["type"] == "signal_2":
result = self.signal_2_method(data["input"])
self.to_emitter.send({"type":"signal_2","result":result})
elif data["type"] == "signal_3":
result = self.signal_3_method(data["input"])
self.to_emitter.send({"type":"signal_3","result":result})
def signal_1_method(self,input_int):
# some long operation here
return input_int*10
def signal_2_method(self,input_int):
# some long operation here
return input_int*100
def signal_3_method(self,input_int):
# some long operation here
return input_int*1000
if __name__ == "__main__":
os.environ["QT_AUTO_SCREEN_SCALE_FACTOR"] = "1"
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
freeze_support()
program = Example_1()
Это то, что мне нужно?
Возможно, ошибка не связана с многопроцессорностью/QThread, но для этого я задаю здесь вопрос, чтобы выяснить, связана ли ошибка с реализацией.
Подробнее здесь: https://stackoverflow.com/questions/791 ... ng-process