Обработка нескольких сообщений с помощью Mqttclient асинхронно и одновременноJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Обработка нескольких сообщений с помощью Mqttclient асинхронно и одновременно

Сообщение Anonymous »

Я разрабатываю программу, которая принимает сообщения из темы MQTT, и моя цель состоит в том, чтобы я мог принимать и обрабатывать несколько сообщений асинхронно.

Я использую клиенты eclipse :
https://www.eclipse.org/paho/files/java ... lient.html
https://www.eclipse.org/paho/ files/javadoc/org/eclipse/paho/client/mqttv3/MqttAsyncClient.html

Проблема в том, что несколько сообщений не обрабатываются одновременно, они все выполняются в той же теме. Я не очень хорошо понимаю разницу между использованием MqttClient и MqttAsyncClient. В javadoc говорится:

MqttClient


Облегченный клиент для общения с сервером MQTT. используя методы, которые
блокируются до завершения операции.


MqttAsyncClient

< blockquote>
Облегченный клиент для взаимодействия с сервером MQTT с использованием неблокирующих
методов, которые позволяют операции выполняться в фоновом режиме.


Мне также не очень понятна разница между использованием метода «subscribe» или «setCallback». Только с помощью «subscribe» вы можете объявить несколько прослушивателей:
setCallback


Задает прослушиватель обратного вызова, который будет использоваться для событий, которые происходят асинхронно.
подписаться
Подписаться на тему...


Попытка отправить десять сообщений одновременно . Мои тесты следующие:

public class FooListener implements IMqttMessageListener {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}

public class FooCallbackListener implements MqttCallback {

@Override
public void connectionLost(Throwable e) {
e.printStackTrace();
}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
//TODO:emtpy
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}

}


MqttClient и подпишитесь:

public class FooMqttClient {

public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();

try {
MqttClient myClient = new MqttClient(serverUri, clientId);
myClient.connect(connOpt);
myClient.subscribe("topic/foo", new FooListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}


Результаты:

Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[topic/foo], Message [Foo 0]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call:7bb52a01-03db-4870-8921-8d6432f2fe27], Topic[ topic/foo], Message [Foo 9]


MqttClient и setCallback:

public class FooMqttCallbackClient {

public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();

try {
MqttClient myClient = new MqttClient(serverUri, clientId);
myClient.connect(connOpt);
myClient.subscribe("topic/foo");
myClient.setCallback(new FooCallbackListener());
} catch (MqttException e) {
e.printStackTrace();
}
}
}


Результаты:

Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: 3d571fd4-906d-4a1f-b06f-a2f7d43b95b2], Topic[ topic/foo], Message [Foo 9]


MqttAsyncClient и подпишитесь:

public class FooAsyncMqttClient {
public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();

try {
MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
myClient.connect(connOpt);
Thread.sleep(1000);
myClient.subscribe("topic/foo", 1, new FooListener());
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}


Результаты:

Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: bbc21897-3c38-4f96-8dea-74522250afee], Topic[ topic/foo], Message [Foo 9]


MqttAsyncClient и setCallback

public class FooAsyncMqttCallbackClient {

public static void main(String[] args) {
MqttConnectOptions connOpt = new MqttConnectOptions();
connOpt.setCleanSession(true);
connOpt.setKeepAliveInterval(30);
String serverUri = "tcp://iot.eclipse.org:1883";
String clientId = UUID.randomUUID().toString();

try {
MqttAsyncClient myClient = new MqttAsyncClient(serverUri, clientId);
myClient.connect(connOpt);
Thread.sleep(1000);
myClient.subscribe("topic/foo", 1);
myClient.setCallback(new FooCallbackListener());
} catch (MqttException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}
}


Результаты:

Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 0]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 1]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 2]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 3]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 4]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 5]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 6]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 7]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 8]
Thread [ MQTT Call: 75b827c9-34fe-4e2f-a723-cf0f277b91d6], Topic[ topic/foo], Message [Foo 9]


Во всех моих тестах прослушиватели выполняются в одном потоке, а не одновременно. Как я могу обрабатывать сообщения одновременно и одновременно? В чем разница между MqttClient и MqttAsyncClient?

Решение:

public class FooExecutorListener implements IMqttMessageListener {

private ExecutorService pool = Executors.newFixedThreadPool(10);

class MessageHandler implements Runnable {
MqttMessage message;
String topic;

public MessageHandler(String topic, MqttMessage message) {
this.message = message;
this.topic = topic;
}

public void run() {
System.out.println("Thread [ " + Thread.currentThread().getName() +
"], Topic[ "+ topic + "], Message [" + message +"] ");
}
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
pool.execute(new MessageHandler(topic, message));
}

}


Результаты:

Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 0]
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 1]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 4]
Thread [ pool-2-thread-4], Topic[ topic/foo], Message [Foo 3]
Thread [ pool-2-thread-7], Topic[ topic/foo], Message [Foo 6]
Thread [ pool-2-thread-6], Topic[ topic/foo], Message [Foo 5]
Thread [ pool-2-thread-8], Topic[ topic/foo], Message [Foo 7]
Thread [ pool-2-thread-3], Topic[ topic/foo], Message [Foo 2]
Thread [ pool-2-thread-1], Topic[ topic/foo], Message [Foo 10]
Thread [ pool-2-thread-2], Topic[ topic/foo], Message [Foo 11]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 12]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 13]
Thread [ pool-2-thread-5], Topic[ topic/foo], Message [Foo 14]
Thread [ pool-2-thread-9], Topic[ topic/foo], Message [Foo 8]
Thread [ pool-2-thread-10], Topic[ topic/foo], Message [Foo 9]


Подробнее здесь: https://stackoverflow.com/questions/477 ... ncurrently
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»