В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные из процесса готовы, он отправляет данные в QThread через многопроцессорный канал, а затем QThread генерирует pyqtSignal, который реализован в родительском классе (pyqt5).
Во многих случаях, когда я пытаюсь отправить данные в QThread (в большинстве случаев), я сталкиваюсь с этой ошибкой:
Traceback (most recent call last):
File "C:\Users\cpapp\Documents\My Projects\web-radio-studio\src\python+\main-window\page-4\player-list.py", line 3601, in get_player_list
self.to_emitter.send({"type":"fetch-result","result":self.player_list_items})
File "C:\Python\lib\multiprocessing\connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\lib\multiprocessing\connection.py", line 295, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109] The pipe has been ended
Эта ошибка возникает во многих исполнениях pycharm .py.
Как ее избежать и как решить?
Пример кода :
main_ui.py
from PyQt5 import QtCore, QtGui, QtWidgets
import sys
import os
from multiprocessing import freeze_support, Queue
from multiprocessing import Condition, Value, Process, Event
import threading
from PyQt5.QtCore import pyqtSignal, QThread, Qt
from multiprocessing import Process, Queue, Pipe
from main_ui import Ui_MainWindow
import time
class Example_1:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()
self.condition = Condition()
self.frame = Value('q', lock=False)
self.quit_event = Event()
self.tr = threading.Thread(target=self.infinite_heartbreat)
self.tr.start()
self.processes = 30
self.mother_pipes = []
self.child_pipes = []
self.queues = []
self.emitters = []
self.child_processes = []
for i in range(0,self.processes):
self.mother_pipes.append(None)
self.child_pipes.append(None)
self.mother_pipes[i], self.child_pipes[i] = Pipe()
self.queues.append(Queue())
self.emitters.append(Emitter(self.mother_pipes[i]))
self.emitters[i].signal_1.connect(self.signal_1_handler)
self.emitters[i].signal_2.connect(self.signal_2_handler)
self.emitters[i].signal_3.connect(self.signal_3_handler)
self.emitters[i].start()
self.child_processes.append(Child_Proc(self.child_pipes[i], self.queues[i],self.condition, self.frame, self.quit_event))
self.child_processes[i].start()
#put some data for test
self.timer = QtCore.QTimer() # set up your QTimer
self.timer.timeout.connect(self.put_data_to_queues)
self.timer.setSingleShot(True)
self.timer.start(125) # set it to timeout in 200 ms
self.MainWindow.closeEvent = lambda event: self.closeEvent(event)
sys.exit(self.app.exec_())
def infinite_heartbreat(self):
next_beat = time.time()
while True:
next_beat += 0.125
time_to_sleep = next_beat - time.time()
if time_to_sleep > 0:
time.sleep(time_to_sleep)
with self.condition:
self.frame.value += 1
self.condition.notify_all()
if self.quit_event.is_set():
print('END')
return
def signal_1_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
for process in range(0,self.processes):
self.queues[process].put({"type":"signal_2","input":result})
def signal_2_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
for process in range(0,self.processes):
self.queues[process].put({"type":"signal_3","input":result})
def signal_3_handler(self,result):
#do something with the result in pyqt5 window (display in QLabel for example)
print(result)
for process in range(0,self.processes):
self.queues[process].put({"type":"signal_1","input":result})
def put_data_to_queues(self):
for process in range(0,self.processes):
for i in range(1,10):
self.queues[process].put({"type":"signal_1","input":i})
self.queues[process].put({"type":"signal_2","input":i})
self.queues[process].put({"type":"signal_3","input":i})
def closeEvent(self,event):
self.quit_event.set()
self.tr.join()
time.sleep(0.5)
for i in range(0,self.processes):
self.emitters[i].signal_1.disconnect()
self.emitters[i].signal_2.disconnect()
self.emitters[i].signal_3.disconnect()
while(self.queues[i].qsize()!=0):
tmp = self.queues[i].get()
self.child_processes[i].terminate()
self.emitters[i].quit()
#print(os.getpid())
event.accept()
class Emitter(QThread):
signal_1 = pyqtSignal(int)
signal_2 = pyqtSignal(int)
signal_3 = pyqtSignal(int)
def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process
def run(self):
while True:
if self.data_from_process.poll():
data = self.data_from_process.recv()
else:
time.sleep(0.1)
continue
if data["type"]=="signal_1":
self.signal_1.emit(data["result"])
elif data["type"]=="signal_2":
self.signal_2.emit(data["result"])
elif data["type"]=="signal_3":
self.signal_3.emit(data["result"])
elif data["type"]=="stop":
return None
class Child_Proc(Process):
def __init__(self, to_emitter, from_mother,condition, frame_number, quit_event):
super().__init__()
self.daemon = True
self.to_emitter = to_emitter
self.data_from_mother = from_mother
self.condition = condition
self.frame_number = frame_number
self.quit_event = quit_event
def run(self):
current_frame = 0
while (True):
with self.condition:
self.condition.wait_for(lambda: current_frame
Подробнее здесь: [url]https://stackoverflow.com/questions/79177633/how-to-solve-broken-pipe-error-in-case-of-qthread-multiprocessing-process[/url]
В моей основной программе pyqt5 я создаю экземпляры некоторых классов. Каждый из этих классов создает экземпляры QThread и Process. Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки. Когда данные из процесса готовы, он отправляет данные в QThread через многопроцессорный канал, а затем QThread генерирует pyqtSignal, который реализован в родительском классе (pyqt5). Во многих случаях, когда я пытаюсь отправить данные в QThread (в большинстве случаев), я сталкиваюсь с этой ошибкой: [code]Traceback (most recent call last): File "C:\Users\cpapp\Documents\My Projects\web-radio-studio\src\python+\main-window\page-4\player-list.py", line 3601, in get_player_list self.to_emitter.send({"type":"fetch-result","result":self.player_list_items}) File "C:\Python\lib\multiprocessing\connection.py", line 211, in send self._send_bytes(_ForkingPickler.dumps(obj)) File "C:\Python\lib\multiprocessing\connection.py", line 295, in _send_bytes nwritten, err = ov.GetOverlappedResult(True) BrokenPipeError: [WinError 109] The pipe has been ended [/code] Эта ошибка возникает во многих исполнениях pycharm .py. Как ее избежать и как решить? Пример кода : main_ui.py [code]from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object): def setupUi(self, MainWindow): MainWindow.setObjectName("MainWindow") MainWindow.resize(300, 133) self.centralwidget = QtWidgets.QWidget(MainWindow) self.centralwidget.setObjectName("centralwidget") MainWindow.setCentralWidget(self.centralwidget) [/code] main.py [code]from PyQt5 import QtCore, QtGui, QtWidgets import sys import os from multiprocessing import freeze_support, Queue from multiprocessing import Condition, Value, Process, Event import threading
from PyQt5.QtCore import pyqtSignal, QThread, Qt from multiprocessing import Process, Queue, Pipe
for i in range(0,self.processes): self.mother_pipes.append(None) self.child_pipes.append(None) self.mother_pipes[i], self.child_pipes[i] = Pipe() self.queues.append(Queue()) self.emitters.append(Emitter(self.mother_pipes[i])) self.emitters[i].signal_1.connect(self.signal_1_handler) self.emitters[i].signal_2.connect(self.signal_2_handler) self.emitters[i].signal_3.connect(self.signal_3_handler)
#put some data for test self.timer = QtCore.QTimer() # set up your QTimer self.timer.timeout.connect(self.put_data_to_queues) self.timer.setSingleShot(True) self.timer.start(125) # set it to timeout in 200 ms
def infinite_heartbreat(self): next_beat = time.time() while True: next_beat += 0.125 time_to_sleep = next_beat - time.time() if time_to_sleep > 0: time.sleep(time_to_sleep) with self.condition: self.frame.value += 1 self.condition.notify_all() if self.quit_event.is_set(): print('END') return
def signal_1_handler(self,result): #do something with the result in pyqt5 window (display in QLabel for example) print(result) for process in range(0,self.processes): self.queues[process].put({"type":"signal_2","input":result})
def signal_2_handler(self,result): #do something with the result in pyqt5 window (display in QLabel for example) print(result) for process in range(0,self.processes): self.queues[process].put({"type":"signal_3","input":result})
def signal_3_handler(self,result): #do something with the result in pyqt5 window (display in QLabel for example) print(result) for process in range(0,self.processes): self.queues[process].put({"type":"signal_1","input":result})
def put_data_to_queues(self): for process in range(0,self.processes): for i in range(1,10): self.queues[process].put({"type":"signal_1","input":i}) self.queues[process].put({"type":"signal_2","input":i}) self.queues[process].put({"type":"signal_3","input":i})
def closeEvent(self,event): self.quit_event.set() self.tr.join() time.sleep(0.5) for i in range(0,self.processes): self.emitters[i].signal_1.disconnect() self.emitters[i].signal_2.disconnect() self.emitters[i].signal_3.disconnect() while(self.queues[i].qsize()!=0): tmp = self.queues[i].get() self.child_processes[i].terminate() self.emitters[i].quit()
В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные...
В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные...
В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные...
В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные...
В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные...