Многопроцессорность Python PyQt5 против asyncio (индивидуальное решение)Python

Программы на Python
Ответить
Anonymous
 Многопроцессорность Python PyQt5 против asyncio (индивидуальное решение)

Сообщение Anonymous »

pyqt5: main_ui.py
from PyQt5 import QtCore, QtGui, QtWidgets

class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(600, 32)
self.centralwidget = QtWidgets.QWidget(MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.horizontalLayout = QtWidgets.QHBoxLayout(self.centralwidget)
self.horizontalLayout.setObjectName("horizontalLayout")
self.label_1 = QtWidgets.QLabel(self.centralwidget)
self.label_1.setText("")
self.label_1.setAlignment(QtCore.Qt.AlignCenter)
self.label_1.setObjectName("label_1")
self.horizontalLayout.addWidget(self.label_1)
MainWindow.setCentralWidget(self.centralwidget)
MainWindow.setWindowTitle("PyQt5 with Processes or asyncio")

Решение 1 QThread/Process: main.py
from main_ui import Ui_MainWindow
from PyQt5 import QtCore, QtGui, QtWidgets
import sys, os

from PyQt5.QtCore import pyqtSignal, QThread, QTimer
from multiprocessing import Process, Queue, Pipe

import random

import threading
import time

class Main:

def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()

self.create_process()

'''
self.put_data_timer = QTimer()
self.put_data_timer.timeout.connect(lambda: self.put_data())
self.put_data_timer.setSingleShot(False)
self.put_data_timer.start(1000)
'''

self.run_thread = True
self.put_data_thread = threading.Thread(target=self.put_data_thread_method)
self.put_data_thread.start()

self.MainWindow.closeEvent = lambda event: self.closeEvent(event)

sys.exit(self.app.exec_())

def put_data(self):
random_int = random.randint(0, 10000)
self.queue.put({'type':'input_data','input_data':random_int})

def put_data_thread_method(self):
while(self.run_thread):
random_int = random.randint(0, 10000)
self.queue.put({'type':'input_data','input_data':random_int})
time.sleep(0.020)

def create_process(self):
self.mother_pipe, self.child_pipe = Pipe()
self.queue = Queue()

self.emitter = Emitter(self.mother_pipe)
self.emitter.error_signal.connect(lambda error_message: print(error_message))
self.emitter.one_signal.connect(lambda data: self.ui.label_1.setText(data))
self.emitter.start()

self.child_process = Child_Proc(self.child_pipe, self.queue)
self.child_process.start()

def closeEvent(self,event):
self.run_thread = False
self.put_data_thread.join()
#self.put_data_timer.stop()
self.queue.put({'type':'close'})
self.child_process.join()
self.child_process.terminate()
self.emitter.quit()
event.accept()

class Emitter(QThread):
error_signal = pyqtSignal(str)
one_signal = pyqtSignal(str)

def __init__(self, from_process: Pipe):
super().__init__()
self.data_from_process = from_process

def run(self):
while True:
try:
data = self.data_from_process.recv()
if data["type"] == "error":
self.error_signal.emit(data["error_message"])
elif data["type"] == "data":
self.one_signal.emit(data["data"])
elif data["type"] == "close":
return None
except:
error_message = traceback.format_exc()
self.error_signal.emit(error_message)
return None

class Child_Proc(Process):

def __init__(self, to_emitter, from_mother):
try:
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother
except:
error_message = str(traceback.format_exc())
to_emitter.put({"type": "error", "error_message": error_message})
os._exit(1)

def run(self):
while(True):
data = self.data_from_mother.get()
if data['type'] == 'close':
self.to_emitter.send(data)
return None
elif data['type'] == 'input_data':
input_data = data['input_data']
input_data *= 10
self.to_emitter.send({'type':'data','data':str(input_data)})

if __name__ == "__main__":
program = Main()

Solution2 asyncio (пользовательское): main_asyncio.py
from main_ui import Ui_MainWindow
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QObject, pyqtSignal
import sys
import random
import time
import asyncio
import qasync
from aioprocessing import AioProcess, AioQueue

import threading
import time

class Main:
def __init__(self):
self.app = QtWidgets.QApplication(sys.argv)
self.loop = qasync.QEventLoop(self.app)
asyncio.set_event_loop(self.loop)

self.MainWindow = QtWidgets.QMainWindow()
self.ui = Ui_MainWindow()
self.ui.setupUi(self.MainWindow)
self.MainWindow.show()

# Setup inter-process communication
self.input_queue = AioQueue()
self.output_queue = AioQueue()
self.process = AioProcess(target=process_run, args=(self.input_queue, self.output_queue))
self.process.start()

self.run_thread = True
self.put_data_thread = threading.Thread(target=self.put_data_thread_method)
self.put_data_thread.start()

asyncio.ensure_future(self.listen_to_output_queue())
self.MainWindow.closeEvent = self.closeEvent

sys.exit(self.app.exec_())

async def listen_to_output_queue(self):
while True:
try:
data = await self.output_queue.coro_get()
if data["type"] == "error":
print(f"Error: {data['message']}")
elif data["type"] == "data":
self.ui.label_1.setText(data["message"])
except asyncio.CancelledError:
break

def put_data_thread_method(self):
while self.run_thread:
random_int = random.randint(0, 10000)
self.input_queue.put({"type": "input_data", "input_data": random_int})
time.sleep(0.020)

def closeEvent(self, event):
self.run_thread = False
self.put_data_thread.join()
self.input_queue.put({"type": "close"})
self.process.join()
event.accept()

def process_run(input_queue, output_queue):
while True:
data = input_queue.get()
if data["type"] == "close":
break
elif data["type"] == "input_data":
result = data["input_data"] * 10
output_queue.put({"type": "data", "message": str(result)})

if __name__ == "__main__":
Main()

Оба решения работают, но для решения 2 я не полностью протестировал.
Кроме того, я не знаю, как улучшить решение 2 с помощью функций asyncio/pyqt5 asyncio.Я хочу видеть время каждой реализации, принимать более быструю и игнорировать более медленную.
Также обратите внимание, что для двух решений, поскольку пары или QThread/Process их много там это такой поток синхронизации:
import threading
import traceback
from multiprocessing import Condition, Value, Event
import time

class Sync_processes:
def __init__(self, main_self):
self.main_self = main_self

self.condition = Condition()

self.start_condition = Condition()
self.continue_condition = Condition()

self.frame = Value('q',100, lock=False) # Shared frame counter

# Shared boolean variable, initialized to False
self.start_flag = Value('b', False, lock=False)
self.continue_flag = Value('b', False, lock=False)
self.quit_event = Event()
self.pre_quit_event = Event()

self.deck_1_flag = Value('b', False, lock=False)
self.deck_2_flag = Value('b', False, lock=False)
self.music_clip_deck_flag = Value('b', False, lock=False)
self.final_slice_flag = Value('b', False, lock=False)
self.final_slice_plot_flag = Value('b', False, lock=False)

self.flag_condition = Condition()

def start(self):
with self.start_condition:
self.start_flag.value = True
self.start_condition.notify_all()
if 'tr' not in dir(self):
self.tr = threading.Thread(target=self.infinite_heartbeat, args=(self.main_self.configuration,))
self.tr.start()

def infinite_heartbeat(self, configuration):
try:
sync_cycle_ms = configuration.get("sync_cycle_ms", 100)
if not isinstance(sync_cycle_ms, (int, float)) or sync_cycle_ms 0:
if not self.pre_quit_event.is_set():
time.sleep(time_to_sleep)
else:
time_to_sleep = sync_cycle_seconds / 5
if not self.pre_quit_event.is_set():
time.sleep(time_to_sleep)

with self.condition:
self.continue_flag.value = False
if not self.pre_quit_event.is_set():
self.frame.value += 1
else:
self.frame.value += 1000
self.condition.notify_all()
if self.quit_event.is_set():
return

if not self.pre_quit_event.is_set():
with self.flag_condition:
self.flag_condition.wait_for(lambda: (self.quit_event.is_set() or (self.deck_1_flag.value and self.deck_2_flag.value and self.music_clip_deck_flag.value and self.final_slice_flag.value)))
if self.quit_event.is_set():
return

time.sleep(sync_cycle_seconds/300)
with self.continue_condition:
self.continue_flag.value = True
self.continue_condition.notify_all()
if self.quit_event.is_set():
return
except Exception as e:
print(traceback.format_exc())

def close(self):
self.quit_event.set()
self.tr.join()


и для первого решения это выглядит так:
def run(self):
try:
self.fetch_player_list_settings()

with self.start_condition:
self.frame_number.value += 1
self.current_frame = self.frame_number.value
self.start_condition.wait_for(lambda: self.start_flag.value)

while (not self.quit_event.is_set()) and (not self.pre_quit_event.is_set()):

with self.condition:
self.condition.wait_for(lambda: self.current_frame < self.frame_number.value or self.pre_quit_event.is_set())
if self.pre_quit_event.is_set():
break

with self.deck_1_condition:
self.deck_1_flag.value = True
self.deck_1_condition.notify_all()

with self.continue_condition:
self.continue_condition.wait_for(lambda: self.continue_flag.value or self.pre_quit_event.is_set())
if self.pre_quit_event.is_set():
break

result = self.one_chunk() # Process one chunk of data
if result == "close":
break

self.current_frame += 1

with self.deck_1_condition:
self.deck_1_flag.value = False
self.deck_1_condition.notify_all()

while (self.data_from_mother.qsize() > 0):
_ = self.data_from_mother.get()
self.data_from_mother.put({"type": "new-status", "status": 'stopped'})
self.data_from_mother.put({"type": "close"})
r = self.one_chunk()
self.to_emitter.send({'type': 'close'})
return
except Exception as e:
error_message = str(traceback.format_exc())
self.to_emitter.send({"type": "error", "error_message": error_message})


Подробнее здесь: https://stackoverflow.com/questions/793 ... m-solution
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»