Я новичок в MQTT и клиенте HiveMQ, поэтому с моей стороны может возникнуть некоторое недопонимание. Также приветствуются любые идеи по улучшению или альтернативные подходы к моему коду.
Я использую брокер сообщений Mosquitto 2.0.22 на Windows Server 2019 (я бы предпочел Linux, но на данный момент я привязан к Windows). Конфигурация брокера:
Код: Выделить всё
max_queued_messages 8000
allow_anonymous true
persistence true
persistence_location
autosave_on_changes true
autosave_interval 60
sys_interval 10
log_type all
log_dest file
log_dest topic
Я использую cleanStart(true) и sessionExpiryInterval(0), поскольку, насколько мне известно, это гарантирует отмену всех подписок при любых новых подключениях или повторных подключениях. Затем я добавляю все необходимые подписки во время подключения через ConnectedListener.
Вот как я подключаюсь к брокеру:
Код: Выделить всё
val mqtt: Mqtt5BlockingClient by lazy {
Mqtt5Client.builder()
.identifier(mqttClientId)
.serverHost(brokerHost)
.serverPort(brokerPort)
.automaticReconnectWithDefaultConfig()
.addConnectedListener { connectContext ->
log.info("MQTT connectComplete(${connectContext.clientConfig}) → $brokerHost:$brokerPort")
sendOnlineStatusMessage(true)
taskExecutor.execute {
mqttOfflineBuffer.flush { mqttMessage ->
mqtt.publishWith()
.topic(mqttMessage.topic)
.payload(mqttMessage.payload.toByteArray())
.retain(mqttMessage.retainSeconds > 0)
.messageExpiryInterval(mqttMessage.retainSeconds)
.qos(MqttQos.AT_LEAST_ONCE)
.send()
}
}
onConnectedHandlers.forEach { it.invoke() } // for adding all subscriptions on re-connect
}
.addDisconnectedListener { connectContext ->
... # Code for logging disconnects and disconnect reasons
onDisconnectedHandlers.forEach { it.invoke() }
}
.willPublish()
... # Code to send offline message
.applyWillPublish()
.buildBlocking()
}
Код: Выделить всё
override fun open() {
if (mqtt.state.isConnected) {
log.warn ("Connection to MQTT message broker is already established.")
return
}
log.trace("Connecting to MQTT message broker '$brokerHost:$brokerPort' ...")
try {
val connAck = mqtt.connectWith()
.cleanStart(true)
.sessionExpiryInterval(0) // Session ends immediately when disconnected
.send()
val notString = if (connAck.isSessionPresent) "" else "NOT "
log.info("MQTT connection established (with client ID '$mqttClientId'). Old session ${notString}re-established.\n" +
"ConnAck restrictions:\n{}", connAck.restrictions)
} catch (e: Exception) {
log.error("Connection to MQTT message broker failed:", e)
}
}
Код: Выделить всё
private val areSubscriptionsDone = AtomicBoolean(false)
override fun run(vararg args: String) {
log.info("Initiating MQTT connection ...")
myMqttClient.registerOnConnectedHandler {
if (areSubscriptionsDone.compareAndSet(false, true)) {
subscribeToTopics()
} else {
log.debug("Subscriptions for this connection already installed.")
}
}
myMqttClient.registerOnDisconnectedHandler {
areSubscriptionsDone.set(false)
}
if (!myMqttClient.isConnected()) {
myMqttClient.open()
...
}
}
private fun subscribeToTopics() {
myMqttClient.subscribeAndHandle(pingTopic) { _, _, _ ->
log.info("→ Received ping message.")
myMqttClient.publishMessage(pongTopic, ...)
}
...
}
Код: Выделить всё
override fun subscribeAndHandle(topicFilter: String, handler: (topic: String, payload: String, isFromRetainedStore: Boolean) -> Unit) {
if (!mqtt.state.isConnected) throwOfflineException()
mqtt.toAsync()
.subscribeWith()
.topicFilter(topicFilter)
.retainHandling(Mqtt5RetainHandling.SEND) // default
.retainAsPublished(false) // default; ensures live forwards arrive with retain=false
.callback { publish ->
val topic = publish.topic.toString()
val payload = publish.payloadAsBytes.toString(Charsets.UTF_8)
val isFromRetainedStore = publish.isRetain
handler(topic, payload, isFromRetainedStore)
}
.send()
.whenComplete { subAck, throwable ->
... // logging
}
}
Все клиенты имеют уникальные идентификаторы клиентов. Темы имеют вид «устройства/[идентификатор-устройства]/подтема». Перезапуск клиента сбрасывает количество зарегистрированных приемов для одного сообщения (только один прием на сообщение). Но когда я перезапускаю брокера, оно увеличивается на единицу.
Подробнее здесь: https://stackoverflow.com/questions/798 ... qtt-client
Мобильная версия