, прежде чем задавать вопрос, я знаю о людях, имеющих подобные проблемы, и рассмотрел аналогичные вопросы, но я не нашел ответа. Отсюда я сделал пример, он работает хорошо. Тем не менее, я хочу небольшую модификацию. Я написал свой собственный класс, который должен работать на ветке без того, чтобы мне не приходилось делать весь код шаблона, например: < /p>
def task_to_run(self)
... # a long task that I want to run on a thread
worker = JZWorker(self.task_to_run, None, "Name")
worker.start()
< /code>
И это класс, который я построил < /p>
from PySide6.QtCore import QObject, QThread, Signal
from loguru import logger
class Worker(QObject):
finished = Signal()
def __init__(self, task, finished_callback, name = "") -> None:
super().__init__()
self._task = task
self._finished = False
self._name = name or f"JZWorker-{self._task.__name__}"
if finished_callback:
self.finished.connect(fin_cb)
self.finished.connect(self._on_finished)
def run(self) -> None:
logger.debug(f"Running separate task in a JZWorker thread: {self._name}")
try:
self._task()
except Exception as ex:
logger.exception(f"Error in worker thread {self._name}: {ex}")
self.finished.emit()
def _on_finished(self) -> None:
self._finished = True
def is_finished(self) -> bool:
return self._finished
class JZWorker(QObject):
"""
JZWorker is a class that can be used to run a non-blocking task in a separate thread without having to do all of the boilerplate code
"""
finished = Signal()
def __init__(
self, task, fin_cb = None, name = "") -> None:
super().__init__()
self._thread = QThread()
self._worker = Worker(task, fin_cb, name)
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run)
self._worker.finished.connect(self._thread.quit)
self._worker.finished.connect(self._worker.deleteLater)
self._worker.finished.connect(self.finished)
self._thread.finished.connect(self._thread.deleteLater)
# self._worker.finished.connect(
# lambda: logger.warning(f"Worker thread {self._worker._name} finished"))
# self._thread.finished.connect(
# lambda: logger.error(f"Thread for worker {self._worker._name} finished"))
def start(self) -> None:
"""
Starts the thread and runs the task in a separate thread. This is a non-blocking call.
"""
self._thread.start()
< /code>
Проблема в том, что даже если поток закончен на 100%, готовый сигнал самостоятельно. Тем не менее, не пострадая от строки ниже, останавливает мое приложение от сбоя при установке Worker = none , но сигнал _Thread.Finefice все еще никогда не испускается. Я предполагаю, что он должен сделать что -то со ссылкой на работник, все еще существующий, чтобы коллекционер мусора не собирал его, но я не уверен. < /P>
self._thread.finished.connect(lambda: logger.error(f"Thread for worker {self._worker._name} finished"))
Я анализирую пример на странице, которая была предоставлена, и нет ответа, который я могу найти.
===================================================================================================================================== мой класс. Нажатие кнопки «Стоп» всегда сбивает мое приложение, потому что Qthread не закончена. < /P>
from PySide6.QtCore import QObject, QThread, Signal, QMargins, Slot
from PySide6.QtWidgets import QMainWindow, QApplication, QWidget, QVBoxLayout, QPushButton
import sys
import time
class Worker(QObject):
finished = Signal()
def __init__(self, task, finished_callback, name = "") -> None:
super().__init__()
self._task = task
self._finished = False
self._name = name or f"JZWorker-{self._task.__name__}"
if finished_callback:
self.finished.connect(finished_callback)
self.finished.connect(self._on_finished)
def run(self) -> None:
print(f"Running separate task in a JZWorker thread: {self._name}")
try:
self._task()
except Exception as ex:
print(f"Error in worker thread {self._name}: {ex}")
self.finished.emit()
def _on_finished(self) -> None:
self._finished = True
def is_finished(self) -> bool:
return self._finished
class JZWorker(QObject):
"""
JZWorker is a class that can be used to run a non-blocking task in a separate thread without having to do all of the boilerplate code
"""
finished = Signal()
def __init__(
self, task, fin_cb = None, name = "") -> None:
super().__init__()
self._thread = QThread()
self._worker = Worker(task, fin_cb, name)
self._worker.moveToThread(self._thread)
self._thread.started.connect(self._worker.run)
self._worker.finished.connect(self._thread.quit)
self._worker.finished.connect(self._worker.deleteLater)
self._worker.finished.connect(self.finished)
self._thread.finished.connect(self._thread.deleteLater)
def start(self) -> None:
"""
Starts the thread and runs the task in a separate thread. This is a non-blocking call.
"""
self._thread.start()
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
top_layout = QVBoxLayout(contentsMargins=QMargins(0, 0, 0, 0))
central_widget = QWidget()
central_widget.setLayout(top_layout)
self.setCentralWidget(central_widget)
self.button_start1 = QPushButton("Start")
self.button_stop = QPushButton("Stop")
top_layout.addWidget(self.button_start1)
top_layout.addWidget(self.button_stop)
self.worker = None
self.worker_finished = False
self.is_running = False
self.button_start1.clicked.connect(lambda: self.button_clicked(self.task1))
self.button_stop.clicked.connect(self.stop)
@Slot()
def button_clicked(self, task):
if self.worker:
print("Worker is already busy")
return
self.worker = JZWorker(task, self.worker_finished)
self.worker.start()
def task1(self):
self.is_running = True
while self.is_running:
time.sleep(1)
print("Sleeping in task 1")
self.worker_finished = True
@Slot()
def stop(self):
self.is_running = False
while not self.worker_finished:
time.sleep(0.1) # wait for worker to finish
print("This was reached meaning worker finished")
# I thought setting None here would work, because the thread has ended...
# following always crashes the app. I want to set worker to None
# if I call worker with another method, I need to make sure that
# the worker does not exist already and is doing something with
# a shared resource
self.worker = None
self.worker_finished = False
if __name__ == "__main__":
app = QApplication(sys.argv)
window = MainWindow()
window.show()
sys.exit(app.exec())
Подробнее здесь: https://stackoverflow.com/questions/797 ... ish-signal
Pyside6 qthread с работником никогда не излучает сигнал отделки ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
QCameraImageCapture не излучает сигнал imageCaptured после вызова функции capture()
Anonymous » » в форуме Python - 0 Ответы
- 78 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Qt/PySide6: Как лучше всего реализовать бесконечный цикл выборки данных с помощью QThread?
Anonymous » » в форуме Python - 0 Ответы
- 16 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Qt/PySide6: Как лучше всего реализовать бесконечный цикл выборки данных с помощью QThread?
Anonymous » » в форуме Python - 0 Ответы
- 14 Просмотры
-
Последнее сообщение Anonymous
-