После этого иногда при публикации начинается сбой, и он продолжает сбой, пока мы не перезапустим клиентскую службу.
Версия протокола MQTT, которую мы используем, — MQTTv5.
Код подключения:
класс Mqtt_Connection:
Код: Выделить всё
def __init__(self, host_name, broker_ip_addr, port, msg_counter, logger):
self._logger = logger
self._condition = threading.Condition()
self._broker_ip_addr = broker_ip_addr
self._port = port # port
self.rf_status = ""
self.id = msg_counter
self.response_received = False
self.client = mqtt.Client(protocol=mqtt.MQTTv5)
def on_message(self, client, userdata, msg):
with self._condition:
self._logger.debug("Reply from node...Topic {0}".format(msg.topic))
self._logger.debug("Reply from node...Topic {0} Payload {1}".format(msg.topic, msg.payload))
if msg.payload != None or msg.payload != "":
try:
payload_dict = json.loads(msg.payload)
self._logger.debug("Reply: {}".format(payload_dict))
if "id" in payload_dict:
self.rf_status = payload_dict["rf_status"]
status = payload_dict["status"]
id = payload_dict["id"]
self._logger.debug("Status : {} rf_status : {} id: {}".format(status, self.rf_status, id))
if self.id == id:
self.response_received = True
self._condition.notify()
except Exception as e:
self._logger.exception("Exception: {}".format(e))
def mqtt_connect(self, topic, payload):
self.client.on_message = self.on_message
self._logger.debug("Connecting to the Broker : {}".format(self._broker_ip_addr))
try:
connection_result = self.client.connect(host=self._broker_ip_addr, port=self._port, clean_start=True) # connect to broker
self.client.loop_start() # It starts a new thread, that calls the loop method at regular intervals in
# which on message will be processed
if connection_result == 0: # to verify the connection is successful or not. 0-true
# pub_result=None
with self._condition:
# subscription
topic_list = topic.split("/") # split the topic with "/" and get first two elements to subscribe parent topic
subscription_topic = topic_list[0] + "/" + topic_list[1] + "/"
self._logger.debug("Subscription Topic : {} ".format(subscription_topic))
rc = self.client.subscribe(subscription_topic, 0)
self._logger.debug("Subscribe Return Code {} ".format(rc[0]))
sub_result = rc[0] # rc[0] will return 0 if subscription successful
if sub_result == 0:
self._logger.debug("Topic {} has been subscribed".format(subscription_topic))
else:
raise exceptions.MQTTSubscribeException("Subscription Failed")
# subscription
# publishing
pub_result = self.client.publish(topic, payload)
pub_result.wait_for_publish(30)
self._logger.debug(
"Publish Return Code = {} Is published = {}".format(pub_result.rc, pub_result.is_published()))
if pub_result.is_published():
self._logger.debug(" Published : {} to the Topic {}".format(payload, topic))
self._condition.wait(300)
if not pub_result.is_published():
raise exceptions.MQTTPublishException("Publishing failed")
if self.response_received == False:
raise exceptions.TimeOutException("Timed out for RF4CNode")
return self.voice_command_status, self.rf_status
else:
raise exceptions.ConnectionException("MQTT Server Connection Failed")
except Exception as err:
self._logger.error(" MQTT Broker Connection Failed".format(err))
raise
finally:
self.client.disconnect()
self.client.loop_stop()
self._logger.debug(" Disconnected from the Broker : {}".format(self._broker_ip_addr))
При сбое значение pub_result.rc всегда равно 0, но is_published
code> всегда имеет значение false.
Клиент всегда может подписаться. только публикация начинается с ошибкой.
Подробнее здесь: https://stackoverflow.com/questions/790 ... nt-service