Распаковка не происходит после обновления Spring Boot версии 3.3.5.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Распаковка не происходит после обновления Spring Boot версии 3.3.5.

Сообщение Anonymous »

У меня есть два микросервиса (kafka-producer-service и message-service). Служба-производитель Kafka отвечает за создание сообщения в теме «message_text_tpc», тогда как служба сообщений прослушивает ту же тему. Он работал до того, как я обновился до Springboot 3.3.5 с 3.1.8. Вот мои классы конфигурации kafka-producer-service
  • KafkaConfig.java

Код: Выделить всё

package com.ripus.kafkatutorials.application.config;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import com.ripus.kafkatutorials.application.bo.MessageRequest;
import com.ripus.kafkatutorials.application.config.kafka.KafkaProducerInterceptor;

@Configuration
public class KafkaConfig {
@Bean
ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfig(), new StringSerializer(),
new KafkaMessageSerializer(true));
}
@Bean
Map producerConfig() {
Map props = new HashMap();
String bootStrapServer = "127.0.0.1:9092";
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 327680);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(KafkaProducerInterceptor.class));
return props;
}

@Bean
KafkaTemplate kafkaTemplate() {
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory());
return kafkaTemplate;
}

}
  • KafkaProducerInterceptor.java

Код: Выделить всё

package com.ripus.kafkatutorials.application.config.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerInterceptor implements ProducerInterceptor {

private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerInterceptor.class);

@Override
public ProducerRecord onSend(ProducerRecord record) {
LOG.info("inside onSend {}", record);
JSONObject msg = new JSONObject(record.value());
LOG.info("{}", msg);
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
LOG.info("Acknowledged :{}", metadata.toString());
}

@Override
public void close() {}
@Override
public void configure(Map configs) {}

}
  • KafkaMessageSerializer.java

Код: Выделить всё

package com.ripus.kafkatutorials.application.config;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;

import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaMessageSerializer extends StringSerializer {

private final static Logger LOG = LoggerFactory.getLogger(KafkaMessageSerializer.class);
private boolean isCompressionRequired;

public KafkaMessageSerializer(boolean isCompressionRequired) {
this.isCompressionRequired = isCompressionRequired;
}

@Override
public byte[] serialize(String topic, String data) {
if (isCompressionRequired) {
long startTime = System.currentTimeMillis();
byte[] compressedData = new byte[] {};
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
GZIPOutputStream gzips = new GZIPOutputStream(bos)) {
gzips.write(data.getBytes());
gzips.finish();  // Ensures all data is compressed
compressedData = bos.toByteArray();
LOG.info("Time Taken for Compression :: {} ms", System.currentTimeMillis() - startTime);
return compressedData;
} catch (IOException e) {
e.printStackTrace();
}
}
return super.serialize(topic, data);
}

}
  • MessageServiceImpl.java

Код: Выделить всё

package com.ripus.kafkatutorials.application.service.impl;

import java.text.MessageFormat;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ripus.kafkatutorials.application.bo.MessageRequest;
import com.ripus.kafkatutorials.application.service.MessageService;

@Service
public class MessageServiceImpl implements MessageService {

private final static Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);

@Autowired
private Environment env;

@Autowired
private KafkaTemplate kafkaMsgTemplate;

@Override
public String postMessage(String request) throws InterruptedException, ExecutionException, JsonProcessingException {
LOGGER.info(MessageFormat.format("Message Sent {0}", request));
CompletableFuture result = kafkaMsgTemplate
.send(env.getProperty("kafka.messageservice.text.topic.name"), request);
LOGGER.info(MessageFormat.format("Message Sent {0}", result.get().toString()));
ObjectMapper mapper = new ObjectMapper();
String resultString = mapper.writeValueAsString(result.get().toString());
return resultString;
}

}
Классы из службы сообщений
  • MessageServiceConfig.java

Код: Выделить всё

package com.ripus.springfox.message.config;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import com.ripus.springfox.message.model.MessageRequest;

@Configuration
@EnableKafka
public class MessageServiceConfig {

@Autowired
private Environment env;

@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;

}
@Bean
ConsumerFactory

Подробнее здесь: [url]https://stackoverflow.com/questions/79195345/decompression-is-not-happening-after-upgrading-spring-boot-version-3-3-5[/url]
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»