Запретить несколько подписок на одну и ту же тему в клиенте HiveMQ MQTTJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Запретить несколько подписок на одну и ту же тему в клиенте HiveMQ MQTT

Сообщение Anonymous »

Я изо всех сил пытаюсь с моими MQTT-клиентами подписаться на тему, не подписываясь на тему несколько раз после того, как клиент некоторое время работает. (У меня также есть проблема с тем, что клиенты больше не обрабатывают входящие сообщения по некоторым темам после того, как клиент некоторое время работает (возможно, потому, что брокер забывает подписки), но это другая тема, которую нужно задать в другом вопросе. Но это была причина, по которой я перешел с постоянных сеансов на cleanStart(true) + sessionExpiryInterval(0) в сочетании с настройкой подписок в addConnectedListener. (Это не так. помогите.))
Я новичок в MQTT и клиенте HiveMQ, поэтому с моей стороны может возникнуть некоторое недопонимание. Также приветствуются любые идеи по улучшению или альтернативные подходы к моему коду.
Я использую брокер сообщений Mosquitto 2.0.22 на Windows Server 2019 (я бы предпочел Linux, но на данный момент я привязан к Windows). Конфигурация брокера:

Код: Выделить всё

max_queued_messages 8000
allow_anonymous true

persistence true
persistence_location 
autosave_on_changes true
autosave_interval 60

sys_interval 10
log_type all
log_dest file 
log_dest topic
Я использую MQTT 5. В качестве клиентов я использую Java-клиент HiveMQ 1.3.10 со Spring Boot и Kotlin. Большинство клиентов работают на нескольких Raspberry Pi, а один клиент работает на компьютере Windowns, на котором работает брокер.
Я использую cleanStart(true) и sessionExpiryInterval(0), поскольку, насколько мне известно, это гарантирует отмену всех подписок при любых новых подключениях или повторных подключениях. Затем я добавляю все необходимые подписки во время подключения через ConnectedListener.
Вот как я подключаюсь к брокеру:

Код: Выделить всё

    val mqtt: Mqtt5BlockingClient by lazy {
Mqtt5Client.builder()
.identifier(mqttClientId)
.serverHost(brokerHost)
.serverPort(brokerPort)
.automaticReconnectWithDefaultConfig()
.addConnectedListener { connectContext ->
log.info("MQTT connectComplete(${connectContext.clientConfig}) → $brokerHost:$brokerPort")
sendOnlineStatusMessage(true)
taskExecutor.execute {
mqttOfflineBuffer.flush { mqttMessage ->
mqtt.publishWith()
.topic(mqttMessage.topic)
.payload(mqttMessage.payload.toByteArray())
.retain(mqttMessage.retainSeconds > 0)
.messageExpiryInterval(mqttMessage.retainSeconds)
.qos(MqttQos.AT_LEAST_ONCE)
.send()
}
}
onConnectedHandlers.forEach { it.invoke() }  // for adding all subscriptions on re-connect
}
.addDisconnectedListener { connectContext ->
... # Code for logging disconnects and disconnect reasons
onDisconnectedHandlers.forEach { it.invoke() }
}
.willPublish()
... # Code to send offline message
.applyWillPublish()
.buildBlocking()
}

Код: Выделить всё

    override fun open() {
if (mqtt.state.isConnected) {
log.warn ("Connection to MQTT message broker is already established.")
return
}

log.trace("Connecting to MQTT message broker '$brokerHost:$brokerPort' ...")
try {
val connAck = mqtt.connectWith()
.cleanStart(true)
.sessionExpiryInterval(0)  // Session ends immediately when disconnected
.send()
val notString = if (connAck.isSessionPresent) "" else "NOT "
log.info("MQTT connection established (with client ID '$mqttClientId').  Old session ${notString}re-established.\n" +
"ConnAck restrictions:\n{}", connAck.restrictions)
} catch (e: Exception) {
log.error("Connection to MQTT message broker failed:", e)
}
}
При запуске клиента он запускается автоматически

Код: Выделить всё

    private val areSubscriptionsDone = AtomicBoolean(false)

override fun run(vararg args: String) {
log.info("Initiating MQTT connection ...")
myMqttClient.registerOnConnectedHandler {
if (areSubscriptionsDone.compareAndSet(false, true)) {
subscribeToTopics()
} else {
log.debug("Subscriptions for this connection already installed.")
}
}
myMqttClient.registerOnDisconnectedHandler {
areSubscriptionsDone.set(false)
}

if (!myMqttClient.isConnected()) {
myMqttClient.open()
...
}
}

private fun subscribeToTopics() {
myMqttClient.subscribeAndHandle(pingTopic) { _, _, _ ->
log.info("→ Received ping message.")
myMqttClient.publishMessage(pongTopic, ...)
}
...
}
А вот как я подписываюсь на темы:

Код: Выделить всё

    override fun subscribeAndHandle(topicFilter: String, handler: (topic: String, payload: String, isFromRetainedStore: Boolean) -> Unit) {
if (!mqtt.state.isConnected) throwOfflineException()

mqtt.toAsync()
.subscribeWith()
.topicFilter(topicFilter)
.retainHandling(Mqtt5RetainHandling.SEND)  // default
.retainAsPublished(false)  // default; ensures live forwards arrive with retain=false
.callback { publish ->
val topic = publish.topic.toString()
val payload = publish.payloadAsBytes.toString(Charsets.UTF_8)
val isFromRetainedStore = publish.isRetain
handler(topic, payload, isFromRetainedStore)
}
.send()
.whenComplete { subAck, throwable ->
... // logging
}
}
В файлах журналов клиентов я вижу, что прием сообщения MQTT регистрируется несколько раз (например, «→ Полученное сообщение ping», и это журналирование запускается обработчиком темы), когда я отправлял одно сообщение. В файле журнала брокера я вижу, что отправлено только одно сообщение, поэтому я предполагаю, что брокер имеет только одну подписку на эту тему для этого клиента.
Все клиенты имеют уникальные идентификаторы клиентов. Темы имеют вид «устройства/[идентификатор-устройства]/подтема». Перезапуск клиента сбрасывает количество зарегистрированных приемов для одного сообщения (только один прием на сообщение). Но когда я перезапускаю брокера, оно увеличивается на единицу.

Подробнее здесь: https://stackoverflow.com/questions/798 ... qtt-client
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»