Невозможно проанализировать сообщения MQTT непосредственно из темы Broker.JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Невозможно проанализировать сообщения MQTT непосредственно из темы Broker.

Сообщение Anonymous »

Я новичок в java, а также во всей spring framework. Я пытаюсь разработать приложение в SpringBoot для использования в контексте MQTT.Приложение состоит из двух микросервисов: первый получает данные от брокера и сохраняет их в базе данных, а второй предоставляет конечные точки, позволяющие пользователям получать и выполнять операции с определенными данными.
Проблема заключается в первом микросервисе DataRetriever, который занимается чтением сообщений о погоде (написанных в формате json) из теме (данные о погоде) в брокере Mosquitto и контекстно анализируйте их, чтобы затем сохранить поля json в соответствующие таблицы в Postgres.
По сути, я не могу найти правильный способ передачи полезных данных сообщения из темы в Object Mapper.
Вся логика, которую я построил, основана на двух файлах:
  • Код: Выделить всё

    MqttBeans
    там, где я управляю логикой соединения, входящей и исходящей из каналов брокера, используется следующий код:

Код: Выделить всё

package com.example.demo.configs;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
* Setting up MQTT Client connection configurations
* the Client Factory
* the Channels: Inbound + Outbound
* the Message Handler
*/

@Configuration
public class MqttBeans {

// Client Factory (factory configs)
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();

// Options Settings
options.setServerURIs(new String[] {"tcp://localhost:1883"});
//options.setUserName("postgres");
//String password = "12345678";
//options.setPassword(password.toCharArray());
options.setCleanSession(true);

factory.setConnectionOptions(options);

return factory;
}

// Inbound Ch (Subscribing)
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(),"#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());

return adapter;
}

// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message message) throws MessagingException {

// Topic
String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // retrieving the topic from the message header
if (topic.equals("weather-data")) {
System.out.println("Here's the topic: " + topic); // printing out the topic
}
// Payload
String payload = message.getPayload().toString();
System.out.println("Here's the payload: " + payload);  // printing out any msg that comes in the ch
}
};
}

// Outbound Ch (Publishing)
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

// Msg Handler
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("serverOut", mqttClientFactory());
messageHandler.setAsync(true); // so that the client will always be up and listening
messageHandler.setDefaultTopic("weather-data");
messageHandler.setDefaultRetained(false);

return messageHandler;
}
}

  • Код: Выделить всё

    StartupUtility
    , который должен анализировать сообщения, написанные в теме брокера, сопоставлять их (с помощью objectMapper) с тем, что написано в моем CityEntity, а также создавать и заполнять таблицы в postgres (данными, которые он прочитал во время анализ полей json)
Проблема заключается в том, что, как видно из кода, написанного в StartupUtility

Код: Выделить всё

package com.example.demo.startup;

import com.example.demo.entities.CityEntity;
import com.example.demo.repos.CityWeatherRepo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

// Implementing CommandLineRunnerInterface to be init when proj is created
@Component
@Log
public class StartupUtility implements CommandLineRunner {

// Passing the json as a value to test the parsing logic
@Value("${demo.json.string}") private String json;

// Wiring the Inbound ch
@Autowired
private MessageChannel mqttInputChannel;

// Wiring the Repo
@Autowired private CityWeatherRepo repo;

@Override
public void run(String... args) throws Exception {

// Init Obj Mapper instance
ObjectMapper mapper = new ObjectMapper();

// Avoiding failure in case of unrecognized fields during json mapping
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

// Getting the json + Converting into the City Class
CityEntity value = mapper.readValue(json, CityEntity.class); // replace json with payload // input: json in string format, output: OpenAPI/CityEntity (class)

// Saving
CityEntity save = repo.save(value);

// Checking the Saving process
log.info(" Entity info " + save.toString());
}
}
json, который я собираюсь проанализировать, жестко закодирован в выделенной переменной "demo.json.string", записанной в файле application.properties. (я сделал это, чтобы проверить, верна ли хотя бы логика синтаксического анализа, и это так), при этом я хочу, чтобы jsons были теми, которые написаны в теме (погодные данные), вот так: р>

Код: Выделить всё

{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":802,"main":"Clouds","description":"scattered clouds","icon":"03d"}],"base":"stations","main":{"temp":291.47,"feels_like":290.74,"temp_min":289.18,"temp_max":292.64,"pressure":1009,"humidity":53},"visibility":10000,"wind":{"speed":5.14,"deg":140},"clouds":{"all":40},"dt":1714483329,"sys":{"type":2,"id":2075535,"country":"GB","sunrise":1714451607,"sunset":1714504902},"timezone":3600,"id":2643743,"name":"London","cod":200}
{"coord":{"lon":2.3488,"lat":48.8534},"weather":[{"id":804,"main":"Clouds","description":"overcast clouds","icon":"04d"}],"base":"stations","main":{"temp":291.56,"feels_like":291.21,"temp_min":290.03,"temp_max":292.64,"pressure":1012,"humidity":67},"visibility":10000,"wind":{"speed":6.69,"deg":140},"clouds":{"all":100},"dt":1714483519,"sys":{"type":2,"id":2012208,"country":"FR","sunrise":1714451474,"sunset":1714503847},"timezone":7200,"id":2988507,"name":"Paris","cod":200}
{"coord":{"lon":-85.1647,"lat":34.257},"weather":[{"id":502,"main":"Rain","description":"heavy intensity rain","icon":"10d"}],"base":"stations","main":{"temp":289.27,"feels_like":289.29,"temp_min":288.15,"temp_max":290.93,"pressure":1018,"humidity":90},"visibility":10000,"wind":{"speed":1.54,"deg":20},"rain":{"1h":5.05},"clouds":{"all":100},"dt":1714483081,"sys":{"type":2,"id":2038061,"country":"US","sunrise":1714474288,"sunset":1714523036},"timezone":-14400,"id":4219762,"name":"Rome","cod":200}
{"coord":{"lon":13.4105,"lat":52.5244},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":301.22,"feels_like":300.5,"temp_min":299.81,"temp_max":302.53,"pressure":1004,"humidity":34},"visibility":10000,"wind":{"speed":5.66,"deg":150},"clouds":{"all":0},"dt":1714483328,"sys":{"type":2,"id":2011538,"country":"DE","sunrise":1714448171,"sunset":1714501842},"timezone":7200,"id":2950159,"name":"Berlin","cod":200}
{"coord":{"lon":-74.006,"lat":40.7143},"weather":[{"id":701,"main":"Mist","description":"mist","icon":"50d"}],"base":"stations","main":{"temp":286.43,"feels_like":286.14,"temp_min":284.84,"temp_max":287.62,"pressure":1015,"humidity":89},"visibility":9656,"wind":{"speed":7.2,"deg":50},"clouds":{"all":100},"dt":1714483436,"sys":{"type":2,"id":2008101,"country":"US","sunrise":1714470912,"sunset":1714521057},"timezone":-14400,"id":5128581,"name":"New York","cod":200}
Моя идея заключалась в том, чтобы внедрить в StartupUtility канал и иметь возможность передавать полезную нагрузку (?) в качестве параметра, но это не работает.
Да. кто-нибудь знает, как решить эту проблему, или у вас есть предложения для новичков по структурированию кода, отличному от логического POV?

Подробнее здесь: https://stackoverflow.com/questions/784 ... oker-topic
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Невозможно проанализировать сообщения MQTT непосредственно из темы Broker.
    Anonymous » » в форуме JAVA
    0 Ответы
    29 Просмотры
    Последнее сообщение Anonymous
  • Клиент MQTT (MQTTNET) не успешно подписывается на темы, несмотря на подключение к EMQX Broker
    Anonymous » » в форуме C#
    0 Ответы
    10 Просмотры
    Последнее сообщение Anonymous
  • Клиент MQTT (MQTTNET) не успешно подписывается на темы, несмотря на подключение к EMQX Broker
    Anonymous » » в форуме C#
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Запустите MQTT Broker в iOS
    Гость » » в форуме IOS
    0 Ответы
    19 Просмотры
    Последнее сообщение Гость
  • Публикация MQTT Broker продолжает завершаться сбоем, пока мы не перезапустим клиентскую службу.
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»