Вот некоторые ограничения: < /p>
Проблемы < /strong> < /p>
- Оба компьютера должны общаться между собой (двунаправленная связь) с любой заданной скоростью, асинхронно (то есть, что A может отправлять несколько сообщений подряд к B, и наоборот). < /Li>
Прослушивание + обработка сообщений не должна блокировать. Это не должно быть проблемой для другого, но вместо этого воссоединиться, когда закрытый снова жив. подключение к одному или другому может быть сделано в любом порядке, который будет означает, что нет реального сервера и клиента.
Идеи
Эти проблемы, я подумал о том, чтобы сделать: < /p>
[*] Используя два разных розетки в пабах на компьютер, чтобы отделить обработку потоков слушателя и отправителя. Я думал о других паттернах (толкатель и т. Д.), Но я чувствую, что самый простой паб-Sub-лучший для этой конфигурации? > Из этого руководства кажется, что я не должен многопоточный разъем, поэтому мне было интересно, как это сделать. нет, и закрытие + перезагрузка контекста и розетки, если он обнаруживает, что другой компьютер больше не отвечает. < /li>
Я не так уверен в этой части, как: < /li>
< /ol>
Скажите, что у вас есть два узла, которые делают это, в этом порядке: < /p>
Подписчик подключается к Конечная точка и получает и считает сообщения.
Publisher связывает с конечной точкой и немедленно отправляет 1000 сообщений.
< /ul>
Тогда абонент, скорее всего, ничего не получит. Вы мигаете, убедитесь, что вы установите правильный фильтр и попробуйте еще раз, и подписчик все еще ничего не получит. Соединение на самом деле имеет значение ... особенно после сбоя a или b.
code
Я сделал первый черновик это работает при принятии во внимание 1. и 2., Но, как вы увидите, это беспорядок, и я не уверен, как его масштабировать, и сделать его лучше, прежде чем пытаться даже использовать сердцебиение ...
компьютер A < /p>
Код: Выделить всё
class A:
def __init__(self, A_to_B_addr, B_to_A_addr):
self.A_to_B_addr = A_to_B_addr
self.B_to_A_addr = B_to_A_addr
self.A_to_B_socket = None
self.B_to_A_socket = None
self.poller = None
self.context = None
self.setup_zmq_pub()
def setup_zmq_pub(self):
# Create a ZMQ context
self.context = zmq.Context()
# Set up a publisher socket to send output messages
self.A_to_B_socket = self.context.socket(zmq.PUB)
self.A_to_B_socket.bind(self.A_to_B_addr)
# Setup poller to handle socket events
self.poller = zmq.Poller()
self.poller.register(self.A_to_B_socket, zmq.POLLOUT)
# Wait for a short time to make sure it is set up
time.sleep(1)
self.send_to_B("A ready")
self.setup_zmq_sub()
def setup_zmq_sub(self):
# Set up a subscriber socket to listen for incoming messages
self.B_to_A_socket = self.context.socket(zmq.SUB)
self.B_to_A_socket.connect(self.B_to_A_addr)
# Subscribe to all messages
self.B_to_A_socket.setsockopt_string(zmq.SUBSCRIBE, "")
self.poller.register(self.B_to_A_socket, zmq.POLLIN)
print(f"Listening on {self.B_to_A_addr} and sending outputs to {self.A_to_B_addr}")
t = threading.Thread(target=self.zmq_listener).start()
def zmq_listener(self):
"""Continuously listens to B, to handle requests"""
while True:
socks = dict(self.poller.poll())
# Check if we can read from the B_to_A_socket
if self.B_to_A_socket in socks and socks[self.B_to_A_socket] == zmq.POLLIN:
# Receive a message
message = self.B_to_A_socket.recv_string()
# Deserialize the message from JSON format
try:
decoded_message = json.loads(message)
print(f"A Received message as JSON: {decoded_message}")
# Decodes the message and handles it
self.on_A_request(decoded_message)
except json.JSONDecodeError:
print("Error: Received message is not a valid JSON.")
def send_to_B(self, message):
"""Sends a zmq message to B computer"""
# Retries to send the message until it is received by B
while True:
socks = dict(self.poller.poll(timeout=1))
# Check if we can send on the dolci_to_B_socket
if self.A_to_B_socket in socks and socks[self.A_to_B_socket] == zmq.POLLOUT:
# Prepare the JSON object to send
json_message = json.dumps({"message": message})
print(f"Sending to B: {message}")
# Send the JSON object
self.A_to_B_socket.send_string(json_message)
break
else:
print(f"Message not sent: {message}")
time.sleep(1)
< /code>
компьютер b < /em>
очень похож на A, с подключениями, представляющими противоположности, немного другое соединение и сообщение «Готов» при запуске: < /p>
class B:
def __init__(self, B_to_A_addr, A_to_B_addr):
self.A_to_B_addr = A_to_B_addr
self.B_to_A_addr = B_to_A_addr
self.A_to_B_socket = None
self.B_to_A_socket = None
self.poller = None
self.context = None
self.A_ready = False
self.setup_zmq_pub()
self.send_to_A("B ready")
def setup_zmq(self):
# Create a ZMQ context
self.context = zmq.Context()
# Set up a subscriber socket to listen for incoming messages
self.A_to_B_socket = self.context.socket(zmq.SUB)
self.A_to_B_socket.connect(self.A_to_B_addr)
# Subscribe to all messages (empty string means receive all)
self.A_to_B_socket.setsockopt_string(zmq.SUBSCRIBE, "")
# Set up a publisher socket to send output messages
self.B_to_A_socket = self.context.socket(zmq.PUB)
self.B_to_A_socket.bind(self.B_to_A_addr)
# Set up the poller
self.poller = zmq.Poller()
self.poller.register(self.A_to_B_socket, zmq.POLLIN)
self.poller.register(self.B_to_A_socket, zmq.POLLOUT)
print(f"Listening on {self.A_to_B_addr} and sending outputs to {self.B_to_A_addr}")
t = threading.Thread(target=self.zmq_listener).start()
def on_B_request(self, request):
"""Handles a request from Dolci"""
# Removes some unexpected behavior
request = request['message']
if "A ready" in request:
self.A_ready = True
[...]
def send_to_A(self, message):
"""Sends a zmq message to A computer"""
while self.set_zmq:
socks = dict(self.poller.poll())
if self.dolci_ready and self.B_to_A_socket in socks and socks[self.B_to_A_socket] == zmq.POLLOUT:
[...]
< /code>
остальное аналогична. ()
Есть ли у вас какие -либо входные данные о том, что может быть здесь лучше, и как решить другие вопросы, обсуждаемые выше?
Подробнее здесь: https://stackoverflow.com/questions/794 ... -computers