Публикация MQTT Broker продолжает завершаться сбоем, пока мы не перезапустим клиентскую службу.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Публикация MQTT Broker продолжает завершаться сбоем, пока мы не перезапустим клиентскую службу.

Сообщение Anonymous »

Мы используем MQTT-брокер в kubernetes.
После этого иногда при публикации начинается сбой, и он продолжает сбой, пока мы не перезапустим клиентскую службу.
Версия протокола MQTT, которую мы используем, — MQTTv5.
Код подключения:
класс Mqtt_Connection:

Код: Выделить всё

def __init__(self, host_name, broker_ip_addr, port, msg_counter, logger):
self._logger = logger
self._condition = threading.Condition()
self._broker_ip_addr = broker_ip_addr
self._port = port  # port

self.rf_status = ""
self.id = msg_counter
self.response_received = False

self.client = mqtt.Client(protocol=mqtt.MQTTv5)

def on_message(self, client, userdata, msg):

with self._condition:

self._logger.debug("Reply from node...Topic {0}".format(msg.topic))
self._logger.debug("Reply from node...Topic {0}  Payload {1}".format(msg.topic, msg.payload))

if msg.payload != None or msg.payload != "":
try:
payload_dict = json.loads(msg.payload)
self._logger.debug("Reply: {}".format(payload_dict))

if "id" in payload_dict:
self.rf_status = payload_dict["rf_status"]
status = payload_dict["status"]
id = payload_dict["id"]
self._logger.debug("Status : {} rf_status : {} id: {}".format(status, self.rf_status, id))

if self.id == id:
self.response_received = True
self._condition.notify()
except Exception as e:
self._logger.exception("Exception: {}".format(e))

def mqtt_connect(self, topic, payload):
self.client.on_message = self.on_message
self._logger.debug("Connecting to the Broker : {}".format(self._broker_ip_addr))

try:
connection_result = self.client.connect(host=self._broker_ip_addr, port=self._port, clean_start=True)  # connect to broker
self.client.loop_start()  # It starts a new thread, that calls the loop method at regular intervals in
# which on message will be processed

if connection_result == 0:  # to verify the connection is successful or not.  0-true
# pub_result=None
with self._condition:

# subscription
topic_list = topic.split("/")  # split the topic with "/" and get first two elements to subscribe parent topic
subscription_topic = topic_list[0] + "/" + topic_list[1] + "/"
self._logger.debug("Subscription Topic : {} ".format(subscription_topic))

rc = self.client.subscribe(subscription_topic, 0)
self._logger.debug("Subscribe Return Code {} ".format(rc[0]))
sub_result = rc[0]  # rc[0] will return 0 if subscription successful

if sub_result == 0:
self._logger.debug("Topic {} has been subscribed".format(subscription_topic))
else:
raise exceptions.MQTTSubscribeException("Subscription Failed")
# subscription

# publishing
pub_result = self.client.publish(topic, payload)
pub_result.wait_for_publish(30)
self._logger.debug(
"Publish Return Code = {}  Is published = {}".format(pub_result.rc, pub_result.is_published()))

if pub_result.is_published():
self._logger.debug(" Published : {} to the Topic {}".format(payload, topic))

self._condition.wait(300)

if not pub_result.is_published():
raise exceptions.MQTTPublishException("Publishing failed")

if self.response_received == False:
raise exceptions.TimeOutException("Timed out for RF4CNode")

return self.voice_command_status, self.rf_status
else:
raise exceptions.ConnectionException("MQTT Server Connection Failed")

except Exception as err:
self._logger.error(" MQTT Broker Connection Failed".format(err))
raise
finally:
self.client.disconnect()
self.client.loop_stop()
self._logger.debug(" Disconnected from the Broker : {}".format(self._broker_ip_addr))
После перезапуска клиента он начинает работать.
При сбое значение pub_result.rc всегда равно 0, но is_published
code> всегда имеет значение false.
Клиент всегда может подписаться. только публикация начинается с ошибкой.

Подробнее здесь: https://stackoverflow.com/questions/790 ... nt-service
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Публикация MQTT Broker продолжает завершаться сбоем, пока мы не перезапустим клиентскую службу.
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Публикация MQTT Broker продолжает завершаться сбоем, пока мы не перезапустим клиентскую службу.
    Anonymous » » в форуме Python
    0 Ответы
    12 Просмотры
    Последнее сообщение Anonymous
  • Публикация MQTT Broker продолжает завершаться сбоем, пока мы не перезапустим клиентскую службу.
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Запустите MQTT Broker в iOS
    Гость » » в форуме IOS
    0 Ответы
    20 Просмотры
    Последнее сообщение Гость
  • Невозможно проанализировать сообщения MQTT непосредственно из темы Broker.
    Anonymous » » в форуме JAVA
    0 Ответы
    31 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»