Полученные дублируемые сообщения, если несколько подписчиво совпадают с входящим сообщениемJAVA

Программисты JAVA общаются здесь
Anonymous
 Полученные дублируемые сообщения, если несколько подписчиво совпадают с входящим сообщением

Сообщение Anonymous »

тестируется с помощью Paho версии 1.2.5
В этом примере я отправляю сообщение в корень назначения/MSG/1/data . У меня есть две подписки: root/msg/1/# и root/msg/+/#. Обе подписки соответствуют сообщению, отправленному в Root/MSG/1/DATA . Я ожидал, что слушатель сообщений будет вызван один раз для каждой подписки. Однако, к моему удивлению, слушатель сообщений был вызван дважды для каждой подписки.public class MqttPahoExample {

public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(false)
.requestResponseInfo(true)
.build();
mqttConnectionOptions.setSendReasonMessages(false);

MqttClient mqttClient = new MqttClient("tcp://localhost:61616", "Client-01");
mqttClient.connect(mqttConnectionOptions);

// Two subscriptions, both will match the incoming data
MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);
IMqttMessageListener iMqttMessageListener = (topic, message1) ->
System.out.println("topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

MqttSubscription[] mqttSubscriptions = {mqttSubscription1, mqttSubscription2};
IMqttMessageListener[] mqttMessageListeners = {iMqttMessageListener, iMqttMessageListener};
mqttClient.subscribe(mqttSubscriptions, mqttMessageListeners);

// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}
< /code>
ожидаемый выход < /p>
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1
< /code>
Фактический вывод < /p>
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:2
topic:root/msg/1/data message:TestMessage id:1
topic:root/msg/1/data message:TestMessage id:1

< /code>
Я ожидал получить 1 сообщение для каждой подписки. Предлагается @brits в ответе ниже, я попробовал тот же пример с идентификатором подписки. Но я все еще получал дополнительную копию сообщения для каждой перекрывающейся подписки. Похоже, это в основном зависит от реализации брокера, что то, как они интерпретировали это предложение из спецификации. Сообщение, одно для каждой дополнительной подписки, и уважение к QoS подписки в каждом случае. < /p>
< /blockquote>
Поэтому я пробовал разные брокеры MQTT < /p>

artemis - output < /li>
< /ol>
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:1
root/msg/1/# topic:root/msg/1/data message:TestMessage id:2
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2
< /code>

mosquitto - output < /li>
< /ol>
root/msg/1/# topic:root/msg/1/data message:TestMessage id:1
root/msg/+/# topic:root/msg/1/data message:TestMessage id:2

код с использованием идентификатора подписки
public class MqttPahoFinalAsync {

public static void main(String[] args) throws MqttException {
MqttConnectionOptionsBuilder builder = new MqttConnectionOptionsBuilder();
MqttConnectionOptions mqttConnectionOptions = builder.automaticReconnect(true)
.username("user")
.password("password".getBytes())
.cleanStart(true)
.requestReponseInfo(true)
.build();
mqttConnectionOptions.setUseSubscriptionIdentifiers(true);

MqttAsyncClient mqttClient = new MqttAsyncClient("tcp://localhost:1883", "Client-01");
mqttClient.connect(mqttConnectionOptions).waitForCompletion();

// 2 subscriptions, both will match the incoming data

// subscription 1
MqttProperties subProperties1 = new MqttProperties();
subProperties1.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties1.setSubscriptionIdentifier(1);

MqttSubscription mqttSubscription1 = new MqttSubscription("root/msg/1/#", 2);
IMqttMessageListener iMqttMessageListener1 = (topic, message1) ->
System.out.println("root/msg/1/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

mqttClient.subscribe(mqttSubscription1, null, null, iMqttMessageListener1, subProperties1).waitForCompletion();

// subscription 2
MqttProperties subProperties2 = new MqttProperties();
subProperties2.setSubscriptionIdentifiers(List.of(0)); // paho forces to have it initialised like this
subProperties2.setSubscriptionIdentifier(2);
MqttSubscription mqttSubscription2 = new MqttSubscription("root/msg/+/#", 2);

IMqttMessageListener iMqttMessageListener2 = (topic, message1) ->
System.out.println("root/msg/+/# topic:" + topic + " message:" + new String(message1.getPayload(), StandardCharsets.UTF_8) + " id:" + message1.getId());

mqttClient.subscribe(mqttSubscription2, null, null, iMqttMessageListener2, subProperties2).waitForCompletion();

// Publish message
MqttMessage message = new MqttMessage("TestMessage".getBytes());
message.setQos(2);
mqttClient.publish("root/msg/1/data", message);
}
}


Подробнее здесь: https://stackoverflow.com/questions/794 ... ming-messa

Вернуться в «JAVA»