Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?

Сообщение Anonymous »

В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные из процесса готовы, он отправляет данные в QThread через многопроцессорный канал, а затем QThread генерирует pyqtSignal, который реализован в родительском классе (pyqt5).
Во многих случаях, когда я пытаюсь отправить данные в QThread (в большинстве случаев), я сталкиваюсь с этой ошибкой:

Код: Выделить всё

Traceback (most recent call last):
File "C:\Users\cpapp\Documents\My Projects\web-radio-studio\src\python+\main-window\page-4\player-list.py", line 3601, in get_player_list
self.to_emitter.send({"type":"fetch-result","result":self.player_list_items})
File "C:\Python\lib\multiprocessing\connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\lib\multiprocessing\connection.py", line 295, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109] The pipe has been ended
Эта ошибка возникает во многих исполнениях pycharm .py.
Как ее избежать и как решить?
Пример кода :
main_ui.py

Код: Выделить всё

from PyQt5 import QtCore, QtGui, QtWidgets
import sys
import os
from multiprocessing import freeze_support, Queue

from PyQt5.QtCore import pyqtSignal, QThread, Qt
from multiprocessing import Process, Queue, Pipe

from main_ui import Ui_MainWindow
import time

class Example_1:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()

self.processes = 30
self.mother_pipes = []
self.child_pipes = []
self.queues = []
self.emitters = []
self.child_processes = []

for i in range(0,self.processes):
self.mother_pipes.append(None)
self.child_pipes.append(None)
self.mother_pipes[i], self.child_pipes[i] = Pipe()
self.queues.append(Queue())
self.emitters.append(Emitter(self.mother_pipes[i]))
self.emitters[i].signal_1.connect(self.signal_1_handler)
self.emitters[i].signal_2.connect(self.signal_2_handler)
self.emitters[i].signal_3.connect(self.signal_3_handler)

self.emitters[i].start()
self.child_processes.append(Child_Proc(self.child_pipes[i], self.queues[i]))
self.child_processes[i].start()

#put some data for test
for process in range(0,self.processes):
for i in range(0,100):
self.queues[process].put({"type":"signal_1","input":i})
for i in range(0,100):
self.queues[process].put({"type":"signal_2","input":i})
for i in range(0,100):
self.queues[process].put({"type":"signal_3","input":i})

self.MainWindow.closeEvent = lambda event: self.closeEvent(event)

sys.exit(self.app.exec_())

def signal_1_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
pass

def signal_2_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
pass

def signal_3_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
pass

def closeEvent(self,event):
for i in range(0,self.processes):
self.child_processes[i].terminate()
self.emitters[i].quit()
event.accept()

class Emitter(QThread):
signal_1 = pyqtSignal(int)
signal_2 = pyqtSignal(int)
signal_3 = pyqtSignal(int)

def __init__(self, from_process:  Pipe):
super().__init__()
self.data_from_process = from_process

def run(self):
while True:
if self.data_from_process.poll():
data = self.data_from_process.recv()
else:
time.sleep(0.1)
continue
if data["type"]=="signal_1":
self.signal_1.emit(data["result"])
elif data["type"]=="signal_2":
self.signal_2.emit(data["result"])
elif data["type"]=="signal_3":
self.signal_3.emit(data["result"])

class Child_Proc(Process):

def __init__(self, to_emitter, from_mother):
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother

def run(self):
while(True):
data = self.data_from_mother.get()
if data["type"] == "signal_1":
result = self.signal_1_method(data["input"])
self.to_emitter.send({"type":"signal_1","result":result})
elif data["type"] == "signal_2":
result = self.signal_2_method(data["input"])
self.to_emitter.send({"type":"signal_2","result":result})
elif data["type"] == "signal_3":
result = self.signal_3_method(data["input"])
self.to_emitter.send({"type":"signal_3","result":result})

def signal_1_method(self,input_int):
# some long operation here
return input_int*10

def signal_2_method(self,input_int):
# some long operation here
return input_int*100

def signal_3_method(self,input_int):
# some long operation here
return input_int*1000

if __name__ == "__main__":
os.environ["QT_AUTO_SCREEN_SCALE_FACTOR"] = "1"
if getattr(sys, 'frozen', False) and hasattr(sys, '_MEIPASS'):
freeze_support()
program = Example_1()
Один хороший совет — использовать только один QThread и множество процессов.
Это то, что мне нужно?
Возможно, ошибка не связана с многопроцессорностью/QThread, но для этого я задаю здесь вопрос, чтобы выяснить, связана ли ошибка с реализацией.

Подробнее здесь: https://stackoverflow.com/questions/791 ... ng-process
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    17 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»