Я реализую интерфейс Pub/Sub процесса «многие ко многим» с помощью python zmq, который можно создавать и удалять по мере необходимости в работающем приложении. Я использую прокси-сервер для подключения сокетов XSUB и XPUB и использую вариант zmq.proxy_steerable, чтобы можно было использовать управляющий сокет для отправки команды TERMINATE остановить прокси. Когда я это делаю, я получаю сообщение об ошибке, когда прокси-сервер получает команду TERMINATE.
Минимальный рабочий пример (MWE)
В следующем MWE для запуска управляемого прокси используется фоновый поток. После кратковременного ожидания в 3 секунды сигнал TERMINATE отправляется в управляющий сокет, что приводит к тому, что строка zmq.proxy_steerable в потоке вызывает ошибку.
< р>
Код: Выделить всё
mwe.pyКод: Выделить всё
import zmq
from threading import Thread
class Proxy:
def __init__(self, context: zmq.Context):
self.context = context
self.thread = Thread(
daemon=True,
target=self._run,
name='Proxy')
self.thread.start()
def _run(self):
publish_socket = self.context.socket(zmq.XPUB)
publish_socket.bind("inproc://subscribe")
subscribe_socket = self.context.socket(zmq.XSUB)
subscribe_socket.bind("inproc://publish")
control_socket = self.context.socket(zmq.SUB)
control_socket.connect("inproc://proxy")
control_socket.setsockopt_string(zmq.SUBSCRIBE, '')
zmq.proxy_steerable(
publish_socket, subscribe_socket,
control=control_socket)
def stop(self):
socket = self.context.socket(zmq.PUB)
socket.bind("inproc://proxy")
socket.send_string('TERMINATE')
self.thread.join()
self.thread = None
if __name__ == "__main__":
import time
context = zmq.Context()
proxy = Proxy(context)
time.sleep(3)
proxy.stop()
Код: Выделить всё
>> python mwe.py
Exception in thread Proxy:
Traceback (most recent call last):
File "/home/bellockk/.conda/envs/zmq/lib/python3.11/threading.py", line 1045, in _bootstrap_inner
self.run()
File "/home/bellockk/.conda/envs/zmq/lib/python3.11/threading.py", line 982, in run
self._target(*self._args, **self._kwargs)
File "/home/bellockk/Development/zeromq/mwe.py", line 21, in _run
zmq.proxy_steerable(
File "zmq/backend/cython/_proxy_steerable.pyx", line 56, in zmq.backend.cython._proxy_steerable.proxy_steerable
File "zmq/backend/cython/checkrc.pxd", line 28, in zmq.backend.cython.checkrc._check_rc
zmq.error.ZMQError: Operation not supported
Примечание: все работает нормально, если я позволю сбор мусора позаботится обо всем при закрытии программы, но для моего реального варианта использования мне нужно создать и отключить интерфейс Pub/Sub за несколько итераций, поэтому мне нужно иметь возможность полностью остановить прокси и перезапустить его. позже.
Я изучаю zmq всего несколько дней, поэтому, скорее всего, совершаю простую ошибку. Любая помощь очень ценится!
Подробнее здесь: https://stackoverflow.com/questions/781 ... -in-python
Мобильная версия