Повторяющиеся пакеты и периодические остановки публикации с использованием клиента HiveMQJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Повторяющиеся пакеты и периодические остановки публикации с использованием клиента HiveMQ

Сообщение Anonymous »

Я разрабатываю приложение для Android, которое публикует пакеты ЭКГ с использованием клиентской библиотеки Java HiveMQ MQTT брокеру HiveMQ. Приложение работает следующим образом:
Пользователь вводит идентификатор пользователя (тема), и после нажатия кнопки Начать публикацию запланированное задание генерирует пакеты данных каждые 900 мс. и добавляет их в очередь. После добавления пакета в очередь задача вызывает методPublishAllQueuedPackets, который публикует пакеты в очереди с QoS 1 через Mqtt5AsyncClient. Наконец, после публикации очередь очищается.
Однако я постоянно сталкиваюсь с двумя проблемами:
  • Повторная публикация ранее отправленных пакетов:
    Некоторые ранее отправленные пакеты в очереди отправляются повторно несколько раз (20–40 раз). Я подозреваю, что это может быть связано либо с неправильным обращением с очередью (например, предыдущая операция публикации требует времени, из-за чего сообщения в очереди не очищаются и впоследствии не рассматриваются в следующем цикле публикации, что фактически публикует некоторые пакеты дважды), либо с асинхронной проблемой. с помощью методаPublishAllQueuedPackets. Эта проблема в основном наблюдается, когда клиент отключается и снова подключается.
  • Периодическая остановка публикации, несмотря на стабильное подключение:
    Иногда, даже когда клиент подключен при стабильном подключении к Интернету процесс публикации приостанавливается, а затем возобновляется случайным образом через некоторое время.
Примечание: моя реализация публикации пакетов может не соответствовать лучшие практики, так как я новичок в разработке MQTT и Android. Я открыт для любых предложений по улучшению. Также обратите внимание, что публикация пакетов брокеру по порядку важна.
Ниже приведен мой полный код для справки:
package com.example.hivemqpublisher;

import android.annotation.SuppressLint;
import android.os.Bundle;
import android.text.TextUtils;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.EditText;
import android.widget.TextView;

import androidx.appcompat.app.AppCompatActivity;

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;

import org.json.JSONArray;
import org.json.JSONObject;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MainActivity extends AppCompatActivity {

private static final String TAG = "HiveMQPublisher";
private static final double[] DATA = {0.0, -0.004, /* ... 2500 data points ... */};

private static final int PACKET_SIZE = 125;
private static final long GENERATE_INTERVAL = 900L; // 900ms

private Mqtt5AsyncClient mqttClient;
private ScheduledExecutorService scheduler;
private int packetNumber = 0;

private Queue packetQueue = new ConcurrentLinkedQueue();

private EditText userIdEditText;
private Button startButton, stopButton;
private TextView clientStatusTextView, lastPublishedTextView, queueEndTextView;

@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);

userIdEditText = findViewById(R.id.userIdEditText);
startButton = findViewById(R.id.startPublishingButton);
stopButton = findViewById(R.id.stopPublishingButton);
clientStatusTextView = findViewById(R.id.clientStatusTextView);
lastPublishedTextView = findViewById(R.id.lastPublishedPacketTextView);
queueEndTextView = findViewById(R.id.queueEndPacketTextView);

startButton.setOnClickListener(v -> startPublishing());
stopButton.setOnClickListener(v -> stopPublishing());

Log.d(TAG, "onCreate: UI initialized.");
}

@SuppressLint("SetTextI18n")
private void startPublishing() {
Log.d(TAG, "startPublishing: Attempting to start publishing.");
String userId = userIdEditText.getText().toString().trim();
if (TextUtils.isEmpty(userId)) {
clientStatusTextView.setText("User ID is required!");
Log.e(TAG, "startPublishing: User ID is empty.");
return;
}

userIdEditText.setEnabled(false);
String topic = "hivemq/ff/" + userId;

// Create MQTT Client
Log.d(TAG, "startPublishing: Creating MQTT client.");
mqttClient = Mqtt5Client.builder()
.serverHost("broker.hivemq.com")
.identifier("android_ecg_publisher_" + userId)
.serverPort(8883)
.sslWithDefaultConfig()
.automaticReconnect()
.initialDelay(1, TimeUnit.SECONDS)
.maxDelay(10, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.buildAsync();

// Connect to Broker
mqttClient.connectWith()
.cleanStart(true)
.keepAlive(1)
.sessionExpiryInterval(300)
.send()
.thenAccept(connAck -> {
Log.i(TAG, "Connected to MQTT Broker successfully.");
runOnUiThread(() -> clientStatusTextView.setText("Connected to MQTT Broker"));
startScheduler(topic);
})
.exceptionally(throwable -> {
Log.e(TAG, "Failed to connect to MQTT Broker: " + throwable.getMessage(), throwable);
runOnUiThread(() -> clientStatusTextView.setText("Failed to connect: " + throwable.getMessage()));
return null;
});

startButton.setVisibility(View.GONE);
stopButton.setVisibility(View.VISIBLE);
}

@SuppressLint("SetTextI18n")
private void stopPublishing() {
Log.d(TAG, "stopPublishing: Attempting to stop publishing.");
if (scheduler != null) {
scheduler.shutdownNow();
Log.d(TAG, "stopPublishing: Scheduler shut down.");
}

packetQueue.clear(); // Clear the queue to reset
Log.d(TAG, "stopPublishing: Packet queue cleared.");

if (mqttClient != null && mqttClient.getState().isConnected()) {
mqttClient.disconnect()
.thenRun(() -> Log.i(TAG, "MQTT Client disconnected"))
.exceptionally(throwable -> {
Log.e(TAG, "Failed to disconnect MQTT client: " + throwable.getMessage(), throwable);
return null;
});
}

packetNumber = 1; // Reset the packet number
runOnUiThread(() -> {
startButton.setVisibility(View.VISIBLE);
stopButton.setVisibility(View.GONE);
clientStatusTextView.setText("Publishing stopped");
lastPublishedTextView.setText("Last Published Packet No: 0");
queueEndTextView.setText("Queue End Packet No: 0");
});
}

private void startScheduler(String topic) {
Log.d(TAG, "startScheduler: Starting scheduler for packet generation.");
scheduler = Executors.newScheduledThreadPool(1);

scheduler.scheduleWithFixedDelay(() -> generatePacket(topic), 0, GENERATE_INTERVAL, TimeUnit.MILLISECONDS);
}

private void generatePacket(String topic) {
try {
Log.d(TAG, "generatePacket: Preparing data for packetNumber: " + packetNumber);
int startIdx = (packetNumber * PACKET_SIZE) % DATA.length;

// Prepare Data Packet
JSONArray jsonArray = new JSONArray();
for (int i = startIdx; i < startIdx + PACKET_SIZE; i++) {
jsonArray.put(DATA[i % DATA.length]);
}

JSONObject packetJson = new JSONObject();
packetJson.put("type", "livecg-data");
packetJson.put("data", jsonArray);
packetJson.put("packetNo", packetNumber);
packetJson.put("Timestamp", System.currentTimeMillis());

packetQueue.add(packetJson);
Log.d(TAG, "generatePacket: Packet added to queue: " + packetNumber);
packetNumber++;

runOnUiThread(() -> queueEndTextView.setText("Queue End Packet No: " + (packetNumber - 1)));
publishAllQueuedPackets(topic);

} catch (Exception e) {
Log.e(TAG, "generatePacket: Error preparing packet: " + e.getMessage(), e);
}
}

private void publishAllQueuedPackets(String topic) {
if (packetQueue.isEmpty()) {
return; // Nothing to publish
}

JSONArray batch = new JSONArray();
AtomicInteger lastPacketNumber = new AtomicInteger(-1); // Atomic wrapper for lambda compatibility

// Build the batch and keep track of the last packet number
for (JSONObject packet : packetQueue) {
batch.put(packet);
lastPacketNumber.set(packet.optInt("packetNo", -1)); // Update the atomic variable
}

mqttClient.publishWith()
.topic(topic)
.qos(MqttQos.AT_LEAST_ONCE)
.payload(batch.toString().getBytes())
.send()
.thenAccept(publishResult -> {
int finalPacketNumber = lastPacketNumber.get(); // Get the last packet number

Log.i(TAG, "publishAllQueuedPackets: Published batch successfully. Last packet: " + finalPacketNumber);

// Update UI with the last published packet
runOnUiThread(() -> {
lastPublishedTextView.setText("Last Published Packet No: " + finalPacketNumber);
});

// Clear the queue after successful publish
packetQueue.clear();
})
.exceptionally(throwable -> {
Log.e(TAG, "publishAllQueuedPackets: Failed to publish batch: " + throwable.getMessage(), throwable);
// Retain the queue for retrying later
return null;
});
}
}



Подробнее здесь: https://stackoverflow.com/questions/792 ... emq-client
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»