Я новичок в java, а также во всей spring framework. Я пытаюсь разработать приложение в SpringBoot для использования в контексте MQTT.Приложение состоит из двух микросервисов: первый получает данные от брокера и сохраняет их в базе данных, а второй предоставляет конечные точки, позволяющие пользователям получать и выполнять операции с определенными данными. Проблема заключается в первом микросервисе DataRetriever, который занимается чтением сообщений о погоде (написанных в формате json) из теме (данные о погоде) в брокере Mosquitto и контекстно анализируйте их, чтобы затем сохранить поля json в соответствующие таблицы в Postgres.
По сути, я не могу найти правильный способ передачи полезных данных сообщения из темы в Object Mapper.
Вся логика, которую я построил, основана на двух файлах:
, который должен анализировать сообщения, написанные в теме брокера, сопоставлять их (с помощью objectMapper) с тем, что написано в моем CityEntity, а также создавать и заполнять таблицы в postgres (данными, которые он прочитал во время анализ полей json)
Проблема заключается в том, что, как видно из кода, написанного в StartupUtility
package com.example.demo.startup;
import com.example.demo.entities.CityEntity;
import com.example.demo.repos.CityWeatherRepo;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
// Implementing CommandLineRunnerInterface to be init when proj is created
@Component
@Log
public class StartupUtility implements CommandLineRunner {
// Passing the json as a value to test the parsing logic
@Value("${demo.json.string}") private String json;
// Wiring the Inbound ch
@Autowired
private MessageChannel mqttInputChannel;
// Wiring the Repo
@Autowired private CityWeatherRepo repo;
@Override
public void run(String... args) throws Exception {
// Init Obj Mapper instance
ObjectMapper mapper = new ObjectMapper();
// Avoiding failure in case of unrecognized fields during json mapping
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Getting the json + Converting into the City Class
CityEntity value = mapper.readValue(json, CityEntity.class); // replace json with payload // input: json in string format, output: OpenAPI/CityEntity (class)
// Saving
CityEntity save = repo.save(value);
// Checking the Saving process
log.info(" Entity info " + save.toString());
}
}
json, который я собираюсь проанализировать, жестко закодирован в выделенной переменной "demo.json.string", записанной в файле application.properties. (я сделал это, чтобы проверить, верна ли хотя бы логика синтаксического анализа, и это так), при этом я хочу, чтобы jsons были теми, которые написаны в теме (погодные данные), вот так: р>
Моя идея заключалась в том, чтобы внедрить в StartupUtility канал и иметь возможность передавать полезную нагрузку (?) в качестве параметра, но это не работает.
Да. кто-нибудь знает, как решить эту проблему, или у вас есть предложения для новичков по структурированию кода, отличному от логического POV?
Я новичок в [b]java[/b], а также во всей [b]spring framework[/b]. Я пытаюсь разработать приложение в [b]SpringBoot[/b] для использования в контексте [b]MQTT[/b].Приложение состоит из двух микросервисов: первый получает данные от брокера и сохраняет их в базе данных, а второй предоставляет конечные точки, позволяющие пользователям получать и выполнять операции с определенными данными. [b]Проблема[/b] заключается в первом микросервисе DataRetriever, который занимается чтением сообщений о погоде (написанных в формате json) из теме (данные о погоде) в брокере [b]Mosquitto[/b] и контекстно анализируйте их, чтобы затем сохранить поля json в соответствующие таблицы в Postgres. По сути, я не могу найти правильный способ передачи полезных данных сообщения из темы в Object Mapper. Вся логика, которую я построил, основана на двух файлах: [list] [*][code]MqttBeans[/code] там, где я управляю логикой соединения, входящей и исходящей из каналов брокера, используется следующий код: [/list] [code]package com.example.demo.configs;
// Inbound Ch (Subscribing) @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); }
@Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("serverIn", mqttClientFactory(),"#"); adapter.setCompletionTimeout(5000); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel());
return adapter; }
// Msg Handler @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException {
// Topic String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString(); // retrieving the topic from the message header if (topic.equals("weather-data")) { System.out.println("Here's the topic: " + topic); // printing out the topic } // Payload String payload = message.getPayload().toString(); System.out.println("Here's the payload: " + payload); // printing out any msg that comes in the ch } }; }
// Outbound Ch (Publishing) @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
// Msg Handler @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("serverOut", mqttClientFactory()); messageHandler.setAsync(true); // so that the client will always be up and listening messageHandler.setDefaultTopic("weather-data"); messageHandler.setDefaultRetained(false);
return messageHandler; } }
[/code] [list] [*][code]StartupUtility[/code], который должен анализировать сообщения, написанные в теме брокера, сопоставлять их (с помощью objectMapper) с тем, что написано в моем CityEntity, а также создавать и заполнять таблицы в postgres (данными, которые он прочитал во время анализ полей json) [/list] [b]Проблема[/b] заключается в том, что, как видно из кода, написанного в StartupUtility [code]package com.example.demo.startup;
// Implementing CommandLineRunnerInterface to be init when proj is created @Component @Log public class StartupUtility implements CommandLineRunner {
// Passing the json as a value to test the parsing logic @Value("${demo.json.string}") private String json;
// Wiring the Inbound ch @Autowired private MessageChannel mqttInputChannel;
// Wiring the Repo @Autowired private CityWeatherRepo repo;
@Override public void run(String... args) throws Exception {
// Init Obj Mapper instance ObjectMapper mapper = new ObjectMapper();
// Avoiding failure in case of unrecognized fields during json mapping mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
// Getting the json + Converting into the City Class CityEntity value = mapper.readValue(json, CityEntity.class); // replace json with payload // input: json in string format, output: OpenAPI/CityEntity (class)
// Saving CityEntity save = repo.save(value);
// Checking the Saving process log.info(" Entity info " + save.toString()); } } [/code] json, который я собираюсь проанализировать, жестко закодирован в выделенной переменной "demo.json.string", записанной в файле application.properties. (я сделал это, чтобы проверить, верна ли хотя бы логика синтаксического анализа, и это так), при этом я хочу, чтобы jsons были теми, которые написаны в теме (погодные данные), вот так: р> [code]{"coord":{"lon":-0.1257,"lat":51.5085},"weather":[{"id":802,"main":"Clouds","description":"scattered clouds","icon":"03d"}],"base":"stations","main":{"temp":291.47,"feels_like":290.74,"temp_min":289.18,"temp_max":292.64,"pressure":1009,"humidity":53},"visibility":10000,"wind":{"speed":5.14,"deg":140},"clouds":{"all":40},"dt":1714483329,"sys":{"type":2,"id":2075535,"country":"GB","sunrise":1714451607,"sunset":1714504902},"timezone":3600,"id":2643743,"name":"London","cod":200} {"coord":{"lon":2.3488,"lat":48.8534},"weather":[{"id":804,"main":"Clouds","description":"overcast clouds","icon":"04d"}],"base":"stations","main":{"temp":291.56,"feels_like":291.21,"temp_min":290.03,"temp_max":292.64,"pressure":1012,"humidity":67},"visibility":10000,"wind":{"speed":6.69,"deg":140},"clouds":{"all":100},"dt":1714483519,"sys":{"type":2,"id":2012208,"country":"FR","sunrise":1714451474,"sunset":1714503847},"timezone":7200,"id":2988507,"name":"Paris","cod":200} {"coord":{"lon":-85.1647,"lat":34.257},"weather":[{"id":502,"main":"Rain","description":"heavy intensity rain","icon":"10d"}],"base":"stations","main":{"temp":289.27,"feels_like":289.29,"temp_min":288.15,"temp_max":290.93,"pressure":1018,"humidity":90},"visibility":10000,"wind":{"speed":1.54,"deg":20},"rain":{"1h":5.05},"clouds":{"all":100},"dt":1714483081,"sys":{"type":2,"id":2038061,"country":"US","sunrise":1714474288,"sunset":1714523036},"timezone":-14400,"id":4219762,"name":"Rome","cod":200} {"coord":{"lon":13.4105,"lat":52.5244},"weather":[{"id":800,"main":"Clear","description":"clear sky","icon":"01d"}],"base":"stations","main":{"temp":301.22,"feels_like":300.5,"temp_min":299.81,"temp_max":302.53,"pressure":1004,"humidity":34},"visibility":10000,"wind":{"speed":5.66,"deg":150},"clouds":{"all":0},"dt":1714483328,"sys":{"type":2,"id":2011538,"country":"DE","sunrise":1714448171,"sunset":1714501842},"timezone":7200,"id":2950159,"name":"Berlin","cod":200} {"coord":{"lon":-74.006,"lat":40.7143},"weather":[{"id":701,"main":"Mist","description":"mist","icon":"50d"}],"base":"stations","main":{"temp":286.43,"feels_like":286.14,"temp_min":284.84,"temp_max":287.62,"pressure":1015,"humidity":89},"visibility":9656,"wind":{"speed":7.2,"deg":50},"clouds":{"all":100},"dt":1714483436,"sys":{"type":2,"id":2008101,"country":"US","sunrise":1714470912,"sunset":1714521057},"timezone":-14400,"id":5128581,"name":"New York","cod":200} [/code] Моя идея заключалась в том, чтобы внедрить в StartupUtility канал и иметь возможность передавать полезную нагрузку (?) в качестве параметра, но это не работает. Да. кто-нибудь знает, как решить эту проблему, или у вас есть предложения для новичков по структурированию кода, отличному от логического POV?
Я новичок в java , а также во всей spring framework . Я пытаюсь разработать приложение в SpringBoot для использования в контексте MQTT .Приложение состоит из двух микросервисов: первый получает данные от брокера и сохраняет их в базе данных, а...
Я строю сервис .NET, который подключается к брокеру EMQX с использованием MQTTnet. В то время как клиент подключается к EMQX, когда я тестирую отправку сообщений через MQTTX (клиент MQTT Client Tool), моя служба никогда не получает их. Похоже, что...
Я строю сервис .NET, который подключается к брокеру EMQX с использованием MQTTnet. В то время как клиент подключается к EMQX, когда я тестирую отправку сообщений через MQTTX (клиент MQTT Client Tool), моя служба никогда не получает их. Похоже, что...
Я разрабатываю приложение, которое собирает данные от некоторых домашних датчиков к первому брокеру Mosquitto, работающему на Raspberry PI, и соединенному со вторым облачным брокером MQTT.
Я хочу разработать то же самое для приложения для iOS,...
Мы используем MQTT-брокер в Kubernetes.
Через некоторое время при публикации начинается сбой, и он продолжает сбой, пока мы не перезапустим клиентскую службу
Версия протокола MQTT, которую мы используем, — MQTTv5.
Код подключения:
def...