pyqt5: main_ui.py
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(600, 32)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.horizontalLayout = QtWidgets.QHBoxLayout(self.centralwidget)
self.horizontalLayout.setObjectName("horizontalLayout")
self.label_1 = QtWidgets.QLabel(self.centralwidget)
self.label_1.setText("")
self.label_1.setAlignment(QtCore.Qt.AlignCenter)
self.label_1.setObjectName("label_1")
self.horizontalLayout.addWidget(self.label_1)
MainWindow.setCentralWidget(self.centralwidget)
MainWindow.setWindowTitle("PyQt5 with Processes or asyncio")
Решение 1 QThread/Process: main.py
from main_ui import Ui_MainWindow
from PyQt5 import QtCore, QtGui, QtWidgets
import sys, os
from PyQt5.QtCore import pyqtSignal, QThread, QTimer
from multiprocessing import Process, Queue, Pipe
import random
import threading
import time
class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()
self.create_process()
'''
self.put_data_timer = QTimer()
self.put_data_timer.timeout.connect(lambda: self.put_data())
self.put_data_timer.setSingleShot(False)
self.put_data_timer.start(1000)
'''
self.run_thread = True
self.put_data_thread = threading.Thread(target=self.put_data_thread_method)
self.put_data_thread.start()
self.MainWindow.closeEvent = lambda event: self.closeEvent(event)
sys.exit(self.app.exec_())
def put_data(self):
random_int = random.randint(0, 10000)
self.queue.put({'type':'input_data','input_data':random_int})
def put_data_thread_method(self):
while(self.run_thread):
random_int = random.randint(0, 10000)
self.queue.put({'type':'input_data','input_data':random_int})
time.sleep(0.020)
def create_process(self):
self.mother_pipe, self.child_pipe = Pipe()
self.queue = Queue()
self.emitter = Emitter(self.mother_pipe)
self.emitter.error_signal.connect(lambda error_message: print(error_message))
self.emitter.one_signal.connect(lambda data: self.ui.label_1.setText(data))
self.emitter.start()
self.child_process = Child_Proc(self.child_pipe, self.queue)
self.child_process.start()
def closeEvent(self,event):
self.run_thread = False
self.put_data_thread.join()
#self.put_data_timer.stop()
self.queue.put({'type':'close'})
self.child_process.join()
self.child_process.terminate()
self.emitter.quit()
event.accept()
class Emitter(QThread):
error_signal = pyqtSignal(str)
one_signal = pyqtSignal(str)
def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process
def run(self):
while True:
try:
data = self.data_from_process.recv()
if data["type"] == "error":
self.error_signal.emit(data["error_message"])
elif data["type"] == "data":
self.one_signal.emit(data["data"])
elif data["type"] == "close":
return None
except:
error_message = traceback.format_exc()
self.error_signal.emit(error_message)
return None
class Child_Proc(Process):
def __init__(self, to_emitter, from_mother):
try:
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother
except:
error_message = str(traceback.format_exc())
to_emitter.put({"type": "error", "error_message": error_message})
os._exit(1)
def run(self):
while(True):
data = self.data_from_mother.get()
if data['type'] == 'close':
self.to_emitter.send(data)
return None
elif data['type'] == 'input_data':
input_data = data['input_data']
input_data *= 10
self.to_emitter.send({'type':'data','data':str(input_data)})
if __name__ == "__main__":
program = Main()
Solution2 asyncio (пользовательское): main_asyncio.py
from main_ui import Ui_MainWindow
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QObject, pyqtSignal
import sys
import random
import time
import asyncio
import qasync
from aioprocessing import AioProcess, AioQueue
import threading
import time
class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.loop = qasync.QEventLoop(self.app)
asyncio.set_event_loop(self.loop)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()
# Setup inter-process communication
self.input_queue = AioQueue()
self.output_queue = AioQueue()
self.process = AioProcess(target=process_run, args=(self.input_queue, self.output_queue))
self.process.start()
self.run_thread = True
self.put_data_thread = threading.Thread(target=self.put_data_thread_method)
self.put_data_thread.start()
asyncio.ensure_future(self.listen_to_output_queue())
self.MainWindow.closeEvent = self.closeEvent
sys.exit(self.app.exec_())
async def listen_to_output_queue(self):
while True:
try:
data = await self.output_queue.coro_get()
if data["type"] == "error":
print(f"Error: {data['message']}")
elif data["type"] == "data":
self.ui.label_1.setText(data["message"])
except asyncio.CancelledError:
break
def put_data_thread_method(self):
while self.run_thread:
random_int = random.randint(0, 10000)
self.input_queue.put({"type": "input_data", "input_data": random_int})
time.sleep(0.020)
def closeEvent(self, event):
self.run_thread = False
self.put_data_thread.join()
self.input_queue.put({"type": "close"})
self.process.join()
event.accept()
def process_run(input_queue, output_queue):
while True:
data = input_queue.get()
if data["type"] == "close":
break
elif data["type"] == "input_data":
result = data["input_data"] * 10
output_queue.put({"type": "data", "message": str(result)})
if __name__ == "__main__":
Main()
Оба решения работают, но для решения 2 я не полностью протестировал.
Кроме того, я не знаю, как улучшить решение 2 с помощью функций asyncio/pyqt5 asyncio.Я хочу видеть время каждой реализации, принимать более быструю и игнорировать более медленную.
Также обратите внимание, что для двух решений, поскольку пары или QThread/Process их много там это такой поток синхронизации:
import threading
import traceback
from multiprocessing import Condition, Value, Event
import time
class Sync_processes:
def __init__(self, main_self):
self.main_self = main_self
self.condition = Condition()
self.start_condition = Condition()
self.continue_condition = Condition()
self.frame = Value('q',100, lock=False) # Shared frame counter
# Shared boolean variable, initialized to False
self.start_flag = Value('b', False, lock=False)
self.continue_flag = Value('b', False, lock=False)
self.quit_event = Event()
self.pre_quit_event = Event()
self.deck_1_flag = Value('b', False, lock=False)
self.deck_2_flag = Value('b', False, lock=False)
self.music_clip_deck_flag = Value('b', False, lock=False)
self.final_slice_flag = Value('b', False, lock=False)
self.final_slice_plot_flag = Value('b', False, lock=False)
self.flag_condition = Condition()
def start(self):
with self.start_condition:
self.start_flag.value = True
self.start_condition.notify_all()
if 'tr' not in dir(self):
self.tr = threading.Thread(target=self.infinite_heartbeat, args=(self.main_self.configuration,))
self.tr.start()
def infinite_heartbeat(self, configuration):
try:
sync_cycle_ms = configuration.get("sync_cycle_ms", 100)
if not isinstance(sync_cycle_ms, (int, float)) or sync_cycle_ms 0:
if not self.pre_quit_event.is_set():
time.sleep(time_to_sleep)
else:
time_to_sleep = sync_cycle_seconds / 5
if not self.pre_quit_event.is_set():
time.sleep(time_to_sleep)
with self.condition:
self.continue_flag.value = False
if not self.pre_quit_event.is_set():
self.frame.value += 1
else:
self.frame.value += 1000
self.condition.notify_all()
if self.quit_event.is_set():
return
if not self.pre_quit_event.is_set():
with self.flag_condition:
self.flag_condition.wait_for(lambda: (self.quit_event.is_set() or (self.deck_1_flag.value and self.deck_2_flag.value and self.music_clip_deck_flag.value and self.final_slice_flag.value)))
if self.quit_event.is_set():
return
time.sleep(sync_cycle_seconds/300)
with self.continue_condition:
self.continue_flag.value = True
self.continue_condition.notify_all()
if self.quit_event.is_set():
return
except Exception as e:
print(traceback.format_exc())
def close(self):
self.quit_event.set()
self.tr.join()
и для первого решения это выглядит так:
def run(self):
try:
self.fetch_player_list_settings()
with self.start_condition:
self.frame_number.value += 1
self.current_frame = self.frame_number.value
self.start_condition.wait_for(lambda: self.start_flag.value)
while (not self.quit_event.is_set()) and (not self.pre_quit_event.is_set()):
with self.condition:
self.condition.wait_for(lambda: self.current_frame < self.frame_number.value or self.pre_quit_event.is_set())
if self.pre_quit_event.is_set():
break
with self.deck_1_condition:
self.deck_1_flag.value = True
self.deck_1_condition.notify_all()
with self.continue_condition:
self.continue_condition.wait_for(lambda: self.continue_flag.value or self.pre_quit_event.is_set())
if self.pre_quit_event.is_set():
break
result = self.one_chunk() # Process one chunk of data
if result == "close":
break
self.current_frame += 1
with self.deck_1_condition:
self.deck_1_flag.value = False
self.deck_1_condition.notify_all()
while (self.data_from_mother.qsize() > 0):
_ = self.data_from_mother.get()
self.data_from_mother.put({"type": "new-status", "status": 'stopped'})
self.data_from_mother.put({"type": "close"})
r = self.one_chunk()
self.to_emitter.send({'type': 'close'})
return
except Exception as e:
error_message = str(traceback.format_exc())
self.to_emitter.send({"type": "error", "error_message": error_message})
Подробнее здесь: https://stackoverflow.com/questions/793 ... m-solution
Многопроцессорность Python PyQt5 против asyncio (индивидуальное решение) ⇐ Python
Программы на Python
1735560061
Anonymous
pyqt5: main_ui.py
from PyQt5 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(600, 32)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.horizontalLayout = QtWidgets.QHBoxLayout(self.centralwidget)
self.horizontalLayout.setObjectName("horizontalLayout")
self.label_1 = QtWidgets.QLabel(self.centralwidget)
self.label_1.setText("")
self.label_1.setAlignment(QtCore.Qt.AlignCenter)
self.label_1.setObjectName("label_1")
self.horizontalLayout.addWidget(self.label_1)
MainWindow.setCentralWidget(self.centralwidget)
MainWindow.setWindowTitle("PyQt5 with Processes or asyncio")
Решение 1 QThread/Process: main.py
from main_ui import Ui_MainWindow
from PyQt5 import QtCore, QtGui, QtWidgets
import sys, os
from PyQt5.QtCore import pyqtSignal, QThread, QTimer
from multiprocessing import Process, Queue, Pipe
import random
import threading
import time
class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()
self.create_process()
'''
self.put_data_timer = QTimer()
self.put_data_timer.timeout.connect(lambda: self.put_data())
self.put_data_timer.setSingleShot(False)
self.put_data_timer.start(1000)
'''
self.run_thread = True
self.put_data_thread = threading.Thread(target=self.put_data_thread_method)
self.put_data_thread.start()
self.MainWindow.closeEvent = lambda event: self.closeEvent(event)
sys.exit(self.app.exec_())
def put_data(self):
random_int = random.randint(0, 10000)
self.queue.put({'type':'input_data','input_data':random_int})
def put_data_thread_method(self):
while(self.run_thread):
random_int = random.randint(0, 10000)
self.queue.put({'type':'input_data','input_data':random_int})
time.sleep(0.020)
def create_process(self):
self.mother_pipe, self.child_pipe = Pipe()
self.queue = Queue()
self.emitter = Emitter(self.mother_pipe)
self.emitter.error_signal.connect(lambda error_message: print(error_message))
self.emitter.one_signal.connect(lambda data: self.ui.label_1.setText(data))
self.emitter.start()
self.child_process = Child_Proc(self.child_pipe, self.queue)
self.child_process.start()
def closeEvent(self,event):
self.run_thread = False
self.put_data_thread.join()
#self.put_data_timer.stop()
self.queue.put({'type':'close'})
self.child_process.join()
self.child_process.terminate()
self.emitter.quit()
event.accept()
class Emitter(QThread):
error_signal = pyqtSignal(str)
one_signal = pyqtSignal(str)
def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process
def run(self):
while True:
try:
data = self.data_from_process.recv()
if data["type"] == "error":
self.error_signal.emit(data["error_message"])
elif data["type"] == "data":
self.one_signal.emit(data["data"])
elif data["type"] == "close":
return None
except:
error_message = traceback.format_exc()
self.error_signal.emit(error_message)
return None
class Child_Proc(Process):
def __init__(self, to_emitter, from_mother):
try:
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother
except:
error_message = str(traceback.format_exc())
to_emitter.put({"type": "error", "error_message": error_message})
os._exit(1)
def run(self):
while(True):
data = self.data_from_mother.get()
if data['type'] == 'close':
self.to_emitter.send(data)
return None
elif data['type'] == 'input_data':
input_data = data['input_data']
input_data *= 10
self.to_emitter.send({'type':'data','data':str(input_data)})
if __name__ == "__main__":
program = Main()
Solution2 asyncio (пользовательское): main_asyncio.py
from main_ui import Ui_MainWindow
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QObject, pyqtSignal
import sys
import random
import time
import asyncio
import qasync
from aioprocessing import AioProcess, AioQueue
import threading
import time
class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.loop = qasync.QEventLoop(self.app)
asyncio.set_event_loop(self.loop)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()
# Setup inter-process communication
self.input_queue = AioQueue()
self.output_queue = AioQueue()
self.process = AioProcess(target=process_run, args=(self.input_queue, self.output_queue))
self.process.start()
self.run_thread = True
self.put_data_thread = threading.Thread(target=self.put_data_thread_method)
self.put_data_thread.start()
asyncio.ensure_future(self.listen_to_output_queue())
self.MainWindow.closeEvent = self.closeEvent
sys.exit(self.app.exec_())
async def listen_to_output_queue(self):
while True:
try:
data = await self.output_queue.coro_get()
if data["type"] == "error":
print(f"Error: {data['message']}")
elif data["type"] == "data":
self.ui.label_1.setText(data["message"])
except asyncio.CancelledError:
break
def put_data_thread_method(self):
while self.run_thread:
random_int = random.randint(0, 10000)
self.input_queue.put({"type": "input_data", "input_data": random_int})
time.sleep(0.020)
def closeEvent(self, event):
self.run_thread = False
self.put_data_thread.join()
self.input_queue.put({"type": "close"})
self.process.join()
event.accept()
def process_run(input_queue, output_queue):
while True:
data = input_queue.get()
if data["type"] == "close":
break
elif data["type"] == "input_data":
result = data["input_data"] * 10
output_queue.put({"type": "data", "message": str(result)})
if __name__ == "__main__":
Main()
Оба решения работают, но для решения 2 я не полностью протестировал.
Кроме того, я не знаю, как улучшить решение 2 с помощью функций asyncio/pyqt5 asyncio.Я хочу видеть время каждой реализации, принимать более быструю и игнорировать более медленную.
Также обратите внимание, что для двух решений, поскольку пары или QThread/Process их много там это такой поток синхронизации:
import threading
import traceback
from multiprocessing import Condition, Value, Event
import time
class Sync_processes:
def __init__(self, main_self):
self.main_self = main_self
self.condition = Condition()
self.start_condition = Condition()
self.continue_condition = Condition()
self.frame = Value('q',100, lock=False) # Shared frame counter
# Shared boolean variable, initialized to False
self.start_flag = Value('b', False, lock=False)
self.continue_flag = Value('b', False, lock=False)
self.quit_event = Event()
self.pre_quit_event = Event()
self.deck_1_flag = Value('b', False, lock=False)
self.deck_2_flag = Value('b', False, lock=False)
self.music_clip_deck_flag = Value('b', False, lock=False)
self.final_slice_flag = Value('b', False, lock=False)
self.final_slice_plot_flag = Value('b', False, lock=False)
self.flag_condition = Condition()
def start(self):
with self.start_condition:
self.start_flag.value = True
self.start_condition.notify_all()
if 'tr' not in dir(self):
self.tr = threading.Thread(target=self.infinite_heartbeat, args=(self.main_self.configuration,))
self.tr.start()
def infinite_heartbeat(self, configuration):
try:
sync_cycle_ms = configuration.get("sync_cycle_ms", 100)
if not isinstance(sync_cycle_ms, (int, float)) or sync_cycle_ms 0:
if not self.pre_quit_event.is_set():
time.sleep(time_to_sleep)
else:
time_to_sleep = sync_cycle_seconds / 5
if not self.pre_quit_event.is_set():
time.sleep(time_to_sleep)
with self.condition:
self.continue_flag.value = False
if not self.pre_quit_event.is_set():
self.frame.value += 1
else:
self.frame.value += 1000
self.condition.notify_all()
if self.quit_event.is_set():
return
if not self.pre_quit_event.is_set():
with self.flag_condition:
self.flag_condition.wait_for(lambda: (self.quit_event.is_set() or (self.deck_1_flag.value and self.deck_2_flag.value and self.music_clip_deck_flag.value and self.final_slice_flag.value)))
if self.quit_event.is_set():
return
time.sleep(sync_cycle_seconds/300)
with self.continue_condition:
self.continue_flag.value = True
self.continue_condition.notify_all()
if self.quit_event.is_set():
return
except Exception as e:
print(traceback.format_exc())
def close(self):
self.quit_event.set()
self.tr.join()
и для первого решения это выглядит так:
def run(self):
try:
self.fetch_player_list_settings()
with self.start_condition:
self.frame_number.value += 1
self.current_frame = self.frame_number.value
self.start_condition.wait_for(lambda: self.start_flag.value)
while (not self.quit_event.is_set()) and (not self.pre_quit_event.is_set()):
with self.condition:
self.condition.wait_for(lambda: self.current_frame < self.frame_number.value or self.pre_quit_event.is_set())
if self.pre_quit_event.is_set():
break
with self.deck_1_condition:
self.deck_1_flag.value = True
self.deck_1_condition.notify_all()
with self.continue_condition:
self.continue_condition.wait_for(lambda: self.continue_flag.value or self.pre_quit_event.is_set())
if self.pre_quit_event.is_set():
break
result = self.one_chunk() # Process one chunk of data
if result == "close":
break
self.current_frame += 1
with self.deck_1_condition:
self.deck_1_flag.value = False
self.deck_1_condition.notify_all()
while (self.data_from_mother.qsize() > 0):
_ = self.data_from_mother.get()
self.data_from_mother.put({"type": "new-status", "status": 'stopped'})
self.data_from_mother.put({"type": "close"})
r = self.one_chunk()
self.to_emitter.send({'type': 'close'})
return
except Exception as e:
error_message = str(traceback.format_exc())
self.to_emitter.send({"type": "error", "error_message": error_message})
Подробнее здесь: [url]https://stackoverflow.com/questions/79317533/python-pyqt5-multiprocessing-vs-asyncio-custom-solution[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия