Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?

Сообщение Anonymous »

В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные из процесса готовы, он отправляет данные в QThread через многопроцессорный канал, а затем QThread генерирует pyqtSignal, который реализован в родительском классе (pyqt5).
Во многих случаях, когда я пытаюсь отправить данные в QThread (в большинстве случаев), я сталкиваюсь с этой ошибкой:

Код: Выделить всё

Traceback (most recent call last):
File "C:\Users\cpapp\Documents\My Projects\web-radio-studio\src\python+\main-window\page-4\player-list.py", line 3601, in get_player_list
self.to_emitter.send({"type":"fetch-result","result":self.player_list_items})
File "C:\Python\lib\multiprocessing\connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\lib\multiprocessing\connection.py", line 295, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109] The pipe has been ended
Эта ошибка возникает во многих исполнениях pycharm .py.
Как ее избежать и как решить?
Пример кода :
main_ui.py

Код: Выделить всё

from PyQt5 import QtCore, QtGui, QtWidgets

class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(300, 133)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
MainWindow.setCentralWidget(self.centralwidget)
main.py

Код: Выделить всё

from PyQt5 import QtCore, QtGui, QtWidgets
import sys
import os
from multiprocessing import freeze_support, Queue
from multiprocessing import Condition, Value, Process, Event
import threading

from PyQt5.QtCore import pyqtSignal, QThread, Qt
from multiprocessing import Process, Queue, Pipe

from main_ui import Ui_MainWindow
import time

class Example_1:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()

self.condition = Condition()
self.frame = Value('q', lock=False)
self.quit_event = Event()
self.tr = threading.Thread(target=self.infinite_heartbreat)
self.tr.start()

self.processes = 30
self.mother_pipes = []
self.child_pipes = []
self.queues = []
self.emitters = []
self.child_processes = []

for i in range(0,self.processes):
self.mother_pipes.append(None)
self.child_pipes.append(None)
self.mother_pipes[i], self.child_pipes[i] = Pipe()
self.queues.append(Queue())
self.emitters.append(Emitter(self.mother_pipes[i]))
self.emitters[i].signal_1.connect(self.signal_1_handler)
self.emitters[i].signal_2.connect(self.signal_2_handler)
self.emitters[i].signal_3.connect(self.signal_3_handler)

self.emitters[i].start()
self.child_processes.append(Child_Proc(self.child_pipes[i], self.queues[i],self.condition, self.frame, self.quit_event))
self.child_processes[i].start()

#put some data for test
self.timer = QtCore.QTimer()  # set up your QTimer
self.timer.timeout.connect(self.put_data_to_queues)
self.timer.setSingleShot(True)
self.timer.start(125)  # set it to timeout in 200 ms

self.MainWindow.closeEvent = lambda event: self.closeEvent(event)

sys.exit(self.app.exec_())

def infinite_heartbreat(self):
next_beat = time.time()
while True:
next_beat += 0.125
time_to_sleep = next_beat - time.time()
if time_to_sleep >  0:
time.sleep(time_to_sleep)
with self.condition:
self.frame.value += 1
self.condition.notify_all()
if self.quit_event.is_set():
print('END')
return

def signal_1_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
for process in range(0,self.processes):
self.queues[process].put({"type":"signal_2","input":result})

def signal_2_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
for process in range(0,self.processes):
self.queues[process].put({"type":"signal_3","input":result})

def signal_3_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
for process in range(0,self.processes):
self.queues[process].put({"type":"signal_1","input":result})

def put_data_to_queues(self):
for process in range(0,self.processes):
for i in range(1,10):
self.queues[process].put({"type":"signal_1","input":i})
self.queues[process].put({"type":"signal_2","input":i})
self.queues[process].put({"type":"signal_3","input":i})

def closeEvent(self,event):
self.quit_event.set()
self.tr.join()
time.sleep(0.5)
for i in range(0,self.processes):
self.emitters[i].signal_1.disconnect()
self.emitters[i].signal_2.disconnect()
self.emitters[i].signal_3.disconnect()
while(self.queues[i].qsize()!=0):
tmp = self.queues[i].get()
self.child_processes[i].terminate()
self.emitters[i].quit()

#print(os.getpid())
event.accept()

class Emitter(QThread):
signal_1 = pyqtSignal(int)
signal_2 = pyqtSignal(int)
signal_3 = pyqtSignal(int)

def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process

def run(self):
while True:
if self.data_from_process.poll():
data = self.data_from_process.recv()
else:
time.sleep(0.1)
continue
if data["type"]=="signal_1":
self.signal_1.emit(data["result"])
elif data["type"]=="signal_2":
self.signal_2.emit(data["result"])
elif data["type"]=="signal_3":
self.signal_3.emit(data["result"])
elif data["type"]=="stop":
return None

class Child_Proc(Process):

def __init__(self, to_emitter, from_mother,condition, frame_number, quit_event):
super().__init__()
self.daemon = True
self.to_emitter = to_emitter
self.data_from_mother = from_mother

self.condition = condition
self.frame_number = frame_number
self.quit_event = quit_event

def run(self):
current_frame = 0
while (True):
with self.condition:
self.condition.wait_for(lambda:  current_frame 

Подробнее здесь: [url]https://stackoverflow.com/questions/79177633/how-to-solve-broken-pipe-error-in-case-of-qthread-multiprocessing-process[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    14 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»