Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?

Сообщение Anonymous »

В моей основной программе pyqt5 я создаю экземпляры некоторых классов.
Каждый из этих классов создает экземпляры QThread и Process.
Также каждый из этих классов отправляет данные для обработки через очередь многопроцессорной обработки.
Когда данные из процесса готовы, он отправляет данные в QThread через многопроцессорный канал, а затем QThread генерирует pyqtSignal, который реализован в родительском классе (pyqt5).
Во многих случаях, когда я пытаюсь отправить данные в QThread (в большинстве случаев), я сталкиваюсь с этой ошибкой:
Traceback (most recent call last):
File "C:\Users\cpapp\Documents\My Projects\web-radio-studio\src\python+\main-window\page-4\player-list.py", line 3601, in get_player_list
self.to_emitter.send({"type":"fetch-result","result":self.player_list_items})
File "C:\Python\lib\multiprocessing\connection.py", line 211, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\lib\multiprocessing\connection.py", line 295, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
BrokenPipeError: [WinError 109] The pipe has been ended

Эта ошибка возникает во многих исполнениях pycharm .py.
Как ее избежать и как решить?
Пример кода :
import subprocess
import re

from PyQt5 import QtCore,QtGui
from PyQt5.QtCore import pyqtSignal, QThread, Qt
from multiprocessing import Process, Queue, Pipe
import sys
from datetime import datetime, timedelta
import time
import traceback

import wmi
import cv2

sys.path.append("../")
sys.path.append("../../")
sys.path.append("../../../")
sys.path.append("../../../../")
sys.path.append("../../../../../")
sys.path.append("../../../../../../")
sys.path.append("../../../../../../..")

import importlib

database_functions = importlib.import_module("python+.lib.sqlite3-functions")

class Support_Ui_Dialog:

def __init__(self,main_self):
try:
self.main_self = main_self
#apply theme
self.main_self.manage_camera_input_device_window.setStyleSheet("*{font-family:"+self.main_self.default_font+";font-size:"+self.main_self.default_font_size+"px;color:"+self.main_self.default_font_color+";}QDialog{background:"+self.main_self.default_background_color+"}QPushButton, QComboBox{background:"+self.main_self.default_buttons_background+";color:"+self.main_self.default_buttons_font_color+"}")

self.need_save = False

#disable some buttons - comboboxes until camera devices are available
self.main_self.ui_manage_camera_input_device_window.select_camera.setEnabled(False)
self.main_self.ui_manage_camera_input_device_window.camera_start.setEnabled(False)
self.main_self.ui_manage_camera_input_device_window.camera_stop.setEnabled(False)
self.main_self.ui_manage_camera_input_device_window.save.setEnabled(False)

#create process
self.process_number = 111
self.manage_camera_input_device_mother_pipe, self.manage_camera_input_device_child_pipe = Pipe()
self.manage_camera_input_device_queue = Queue()
self.manage_camera_input_device_emitter = Manage_Camera_Input_Device_Emitter(self.manage_camera_input_device_mother_pipe)
self.manage_camera_input_device_emitter.error_signal.connect(lambda error_message:self.close_window_with_error_timer(error_message))
self.manage_camera_input_device_emitter.image_ready_signal.connect(self.on_image_ready)
self.manage_camera_input_device_emitter.devices_settings.connect(self.display_devices_and_settings)
self.manage_camera_input_device_emitter.save_finished.connect(self.save_finished)

self.manage_camera_input_device_emitter.start()
self.manage_camera_input_device_child_process = Manage_Camera_Input_Device_Child_Proc(self.manage_camera_input_device_child_pipe, self.manage_camera_input_device_queue)
self.manage_camera_input_device_child_process.start()

counter = 0
for process in self.main_self.manage_processes_instance.processes:
if "process_number" in process:
if process["process_number"]==self.process_number:
self.main_self.manage_processes_instance.processes[counter]["pid"] = self.manage_camera_input_device_child_process.pid
self.main_self.manage_processes_instance.processes[counter]["start_datetime"] = datetime.now()
self.main_self.manage_processes_instance.processes[counter]["status"] = "in_progress"
counter += 1

if self.main_self.manage_proccesses_window_is_open:
self.main_self.manage_proccesses_window_support_code.manage_proccesses_queue.put({"type":"table-update","processes":self.main_self.manage_processes_instance.processes})

#select_camera currentIndexChanged event
self.main_self.ui_manage_camera_input_device_window.select_camera.currentIndexChanged.connect(lambda index:self.on_select_camera_combobox_changed(index))

#camera_start click event
self.main_self.ui_manage_camera_input_device_window.camera_start.clicked.connect(lambda state:self.on_camera_start_click(state))

#camera_stop click event
self.main_self.ui_manage_camera_input_device_window.camera_stop.clicked.connect(lambda state:self.on_camera_stop_click(state))

#save click event
self.main_self.ui_manage_camera_input_device_window.save.clicked.connect(lambda state:self.on_save_click(state))

#cancel click event
self.main_self.ui_manage_camera_input_device_window.cancel.clicked.connect(lambda state:self.on_cancel_click(state))

self.main_self.manage_camera_input_device_window.closeEvent = lambda event:self.closeEvent(event)
except Exception as e:
error_message = str(traceback.format_exc())
self.main_self.manage_camera_input_device_window.closeEvent = lambda event:self.closeEvent(event)
self.timer = QtCore.QTimer() # set up your QTimer
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message)) # connect it to your update function
self.timer.setSingleShot(True)
self.timer.start(200) # set it to timeout in 200 ms

#select_camera currentIndexChanged event
def on_select_camera_combobox_changed(self,index):
try:
self.need_save = True
except:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

#camera_start click event
def on_camera_start_click(self,state):
try:
device_name = self.main_self.ui_manage_camera_input_device_window.select_camera.currentText()
if device_name!="Καμία συσκευή κάμερας":
self.main_self.ui_manage_camera_input_device_window.camera_start.setEnabled(False)
self.main_self.ui_manage_camera_input_device_window.camera_stop.setEnabled(True)
self.main_self.ui_manage_camera_input_device_window.select_camera.setEnabled(False)
self.manage_camera_input_device_queue.put({"type":"test","content":device_name})
except:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

#camera_stop click event
def on_camera_stop_click(self,state):
try:
self.manage_camera_input_device_queue.put({"type":"stop","content":""})

self.main_self.ui_manage_camera_input_device_window.camera_start.setEnabled(True)
self.main_self.ui_manage_camera_input_device_window.camera_stop.setEnabled(False)
self.main_self.ui_manage_camera_input_device_window.select_camera.setEnabled(True)

self.main_self.ui_manage_camera_input_device_window.video_label.clear()
except:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

#save click event
def on_save_click(self,state):
try:
device_name = self.main_self.ui_manage_camera_input_device_window.select_camera.currentText()

self.manage_camera_input_device_queue.put({"type":"save","device_name":device_name})
except:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

#cancel click event
def on_cancel_click(self,state):
try:
self.main_self.manage_camera_input_device_window.close()
except:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

#signal for display video image frame
def on_image_ready(self,rgb_image):
try:
if self.main_self.ui_manage_camera_input_device_window.camera_start.isEnabled()==False:
rgb_image = rgb_image[0]
self.main_self.ui_manage_camera_input_device_window.video_label.show()

h, w, ch = rgb_image.shape
bytes_per_line = ch * w
q_img = QtGui.QImage(rgb_image.data, w, h, bytes_per_line, QtGui.QImage.Format_RGB888)
self.main_self.ui_manage_camera_input_device_window.video_label.setPixmap(QtGui.QPixmap.fromImage(q_img))

self.main_self.ui_manage_camera_input_device_window.video_label.show()
except:
error_message = str(traceback.format_exc())
print(error_message)
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

def pil2pixmap(self, im):
try:
if im.mode == "RGB":
r, g, b = im.split()
im = Image.merge("RGB", (b, g, r))
elif im.mode == "RGBA":
r, g, b, a = im.split()
im = Image.merge("RGBA", (b, g, r, a))
elif im.mode == "L":
im = im.convert("RGBA")
# Bild in RGBA konvertieren, falls nicht bereits passiert
im2 = im.convert("RGBA")
data = im2.tobytes("raw", "RGBA")
qim = QtGui.QImage(data, im.size[0], im.size[1], QtGui.QImage.Format_ARGB32)
pixmap = QtGui.QPixmap.fromImage(qim)
return pixmap
except:
error_message = traceback.format_exc()
self.main_self.open_ip_calls_error_window(error_message)

#signal for display device names
def display_devices_and_settings(self,camera_input_device_list,device_index):
try:
for input_device in camera_input_device_list:
self.main_self.ui_manage_camera_input_device_window.select_camera.addItem(input_device[2])

if(input_device[1]==device_index):
self.main_self.ui_manage_camera_input_device_window.select_camera.setCurrentIndex(input_device[0])

self.main_self.ui_manage_camera_input_device_window.select_camera.setEnabled(True)
self.main_self.ui_manage_camera_input_device_window.camera_start.setEnabled(True)
self.main_self.ui_manage_camera_input_device_window.camera_stop.setEnabled(True)
self.main_self.ui_manage_camera_input_device_window.save.setEnabled(True)

self.need_save = False
except:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer()
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message))
self.timer.setSingleShot(True)
self.timer.start(200)

#when save finished event
def save_finished(self):
try:
self.manage_camera_input_device_child_process.terminate()
self.manage_camera_input_device_emitter.quit()
self.manage_camera_input_device_child_process = None
self.manage_camera_input_device_emitter = None

counter = 0
for process in self.main_self.manage_processes_instance.processes:
if "process_number" in process:
if process["process_number"]==self.process_number:
self.main_self.manage_processes_instance.processes[counter]["pid"] = None
self.main_self.manage_processes_instance.processes[counter]["start_datetime"] = None
self.main_self.manage_processes_instance.processes[counter]["status"] = "stopped"
self.main_self.manage_processes_instance.processes[counter]["cpu"] = 0
self.main_self.manage_processes_instance.processes[counter]["ram"] = 0
counter += 1

if self.main_self.manage_proccesses_window_is_open:
self.main_self.manage_proccesses_window_support_code.manage_proccesses_queue.put({"type":"table-update","processes":self.main_self.manage_processes_instance.processes})

self.need_save = False

self.main_self.manage_camera_input_device_window.close()
except Exception as e:
error_message = str(traceback.format_exc())
self.main_self.manage_input_device_window.closeEvent = lambda event:self.closeEvent(event)
self.timer = QtCore.QTimer() # set up your QTimer
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message)) # connect it to your update function
self.timer.setSingleShot(True)
self.timer.start(200) # set it to timeout in 200 ms

def close_window_with_error_timer(self,error_message):
self.timer = QtCore.QTimer() # set up your QTimer
self.timer.timeout.connect(lambda error_message=error_message:self.close_window_with_error(None,error_message)) # connect it to your update function
self.timer.setSingleShot(True)
self.timer.start(200) # set it to timeout in 200 ms

def close_window_with_error(self,state,error_message):
self.need_save = False
self.main_self.manage_camera_input_device_window.close()
self.main_self.open_manage_camera_input_device_error_window(error_message)

def open_error_window(self,error_message):
self.main_self.open_manage_camera_input_device_error_window(error_message)

def closeEvent(self,event):
try:
if self.need_save == True:
self.main_self.open_manage_camera_input_device_save_question_window()
#return None

if self.need_save == False:
try:

try:
if self.manage_camera_input_device_child_process is not None:
self.manage_camera_input_device_child_process.terminate()
except:
pass
try:
if self.manage_camera_input_device_emitter is not None:
self.manage_camera_input_device_emitter.quit()
except:
pass

try:
counter = 0
for process in self.main_self.manage_processes_instance.processes:
if "process_number" in process:
if process["process_number"]==self.process_number:
self.main_self.manage_processes_instance.processes[counter]["pid"] = None
self.main_self.manage_processes_instance.processes[counter]["start_datetime"] = None
self.main_self.manage_processes_instance.processes[counter]["status"] = "stopped"
self.main_self.manage_processes_instance.processes[counter]["cpu"] = 0
self.main_self.manage_processes_instance.processes[counter]["ram"] = 0
counter += 1
if self.main_self.manage_proccesses_window_is_open:
self.main_self.manage_proccesses_window_support_code.manage_proccesses_queue.put({"type":"table-update","processes":self.main_self.manage_processes_instance.processes})
except Exception as e:
pass
except:
pass

self.need_save = False
self.main_self.manage_camera_input_device_window_is_open = False
event.accept()
else:
event.ignore()
except Exception as e:
error_message = str(traceback.format_exc())
self.timer = QtCore.QTimer() # set up your QTimer
self.timer.timeout.connect(lambda error_message=error_message:self.open_error_window(error_message)) # connect it to your update function
self.timer.setSingleShot(True)
self.timer.start(200) # set it to timeout in 200 ms

class Manage_Camera_Input_Device_Emitter(QThread):
try:
image_ready_signal = pyqtSignal(list)
save_finished = pyqtSignal()
devices_settings = pyqtSignal(list,int)
error_signal = pyqtSignal(str)
except:
pass

def __init__(self, from_process: Pipe):
try:
super().__init__()
self.data_from_process = from_process
except:
pass

def run(self):
try:
while True:
if self.data_from_process.poll():
data = self.data_from_process.recv()
else:
time.sleep(0.1)
continue
if data["type"]=="image_ready":
self.image_ready_signal.emit(data["pil_image"])
elif data["type"]=="save_finished":
self.save_finished.emit()
elif data["type"]=="available_devices":
self.devices_settings.emit(data["devices"],data["device_index"])
elif data["type"] == "error":
self.error_signal.emit(data["error_message"])
except:
error_message = traceback.format_exc()
self.error_signal.emit(error_message)

class Manage_Camera_Input_Device_Child_Proc(Process):

def __init__(self, to_emitter, from_mother):
try:
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother
except:
try:
error_message = str(traceback.format_exc())
to_emitter.send({"type":"error","error_message":error_message})
except:
pass

def run(self):
try:
self.fetch_camera_input_settings()

self.play_status = "stopped"
self.process_terminated = False
self.capture = None

while(self.process_terminated == False):
if self.play_status == "stopped":
data = self.data_from_mother.get()
else:
q_size = self.data_from_mother.qsize()
if q_size>0:
data = self.data_from_mother.get()
else:
data = None
if data is not None:
if data["type"] == "stop-process":
if self.play_status != "playing":
self.play_status = "stopped"
if self.capture is not None:
self.capture.release()
self.process_terminated = True
return 1
if data["type"] == "save":
device_name = data["device_name"]
self.save(device_name)
break
elif data["type"] == "test":
self.camera_input_device_name = data["content"]
self.camera_input_device_index = 0
for camera in self.available_cameras:
if camera["device_name"] == self.camera_input_device_name:
self.camera_input_device_index = camera["device_index"]
break
self.play_status = "playing"
try:
self.capture = cv2.VideoCapture(self.camera_input_device_index)
except:
print(traceback.format_exc())
elif data["type"] == "stop":
self.play_status = "stopped"
if self.capture is not None:
self.capture.release()
if self.play_status=="playing":
if self.capture is not None:
ret, frame = self.capture.read()
if ret:
rgb_image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
self.to_emitter.send({"type":"image_ready","pil_image":[rgb_image]})
except:
error_message = str(traceback.format_exc())
print(error_message)
self.to_emitter.send({"type":"error","error_message":error_message})

def get_camera_index(self,camera_name):
try:
c = wmi.WMI()
index = 0
for device in c.Win32_PnPEntity():
if device.Name and ("camera" in device.Name.lower() or "video" in device.Name.lower()):
if device.Name == camera_name:
return index
index += 1
return None
except:
error_message = str(traceback.format_exc())
print(error_message)
self.to_emitter.send({"type":"error","error_message":error_message})

#Gets from database the input device settings
def fetch_camera_input_settings(self):
try:
self.database_functions = database_functions

self.camera_input_device_name = database_functions.read_setting("camera_input_device_name")["value"]
self.camera_input_device_number = int(self.database_functions.read_setting("camera_input_device_number")["value"])

self.camera_input_devices = [[0,-1,"Καμία συσκευή κάμερας"]]
self.available_cameras = [{"device_index":-1,"device_name":"Καμία συσκευή κάμερας"}]

# FFmpeg command to list video capture devices on Windows
cmd = ["ffmpeg", "-list_devices", "true", "-f", "dshow", "-i", "dummy"]
result = subprocess.run(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, text=True)
output = result.stderr # FFmpeg sends device listing to stderr

# Updated regular expression to capture both video and audio devices
device_pattern = re.compile(r'\[dshow @ .+?\] "(.*?)" \(video\)')
cameras = device_pattern.findall(output)
counter = 0
for camera in cameras:
counter += 1
self.camera_input_devices.append([counter,counter-1,camera])
self.available_cameras.append({"device_index": counter-1, "device_name": camera})

self.to_emitter.send({"type":"available_devices","devices":self.camera_input_devices,"device_index":self.camera_input_device_number})
except:
error_message = traceback.format_exc()
self.to_emitter.send({"type":"error","error_message":error_message})

#Save to database the input device settings
def save(self,device_name):
try:
for camera in self.available_cameras:
if(camera["device_name"]==device_name):
device_number=camera["device_index"]
self.camera_input_device_index = device_number

self.database_functions.update_setting({"setting":"camera_input_device_name","value":device_name})
self.database_functions.update_setting({"setting":"camera_input_device_number","value":device_number})

self.to_emitter.send({"type":"save_finished"})
except:
error_message = traceback.format_exc()
self.to_emitter.send({"type":"error","error_message":error_message})


Подробнее здесь: https://stackoverflow.com/questions/791 ... ng-process
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Как устранить ошибку сломанного канала в случае QThread/multiprocessing.Process?
    Anonymous » » в форуме Python
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»