- KafkaConfig.java
Код: Выделить всё
package com.ripus.kafkatutorials.application.config;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import com.ripus.kafkatutorials.application.bo.MessageRequest;
import com.ripus.kafkatutorials.application.config.kafka.KafkaProducerInterceptor;
@Configuration
public class KafkaConfig {
@Bean
ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfig(), new StringSerializer(),
new KafkaMessageSerializer(true));
}
@Bean
Map producerConfig() {
Map props = new HashMap();
String bootStrapServer = "127.0.0.1:9092";
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 327680);
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, Arrays.asList(KafkaProducerInterceptor.class));
return props;
}
@Bean
KafkaTemplate kafkaTemplate() {
KafkaTemplate kafkaTemplate = new KafkaTemplate(producerFactory());
return kafkaTemplate;
}
}
- KafkaProducerInterceptor.java
Код: Выделить всё
package com.ripus.kafkatutorials.application.config.kafka;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaProducerInterceptor implements ProducerInterceptor {
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerInterceptor.class);
@Override
public ProducerRecord onSend(ProducerRecord record) {
LOG.info("inside onSend {}", record);
JSONObject msg = new JSONObject(record.value());
LOG.info("{}", msg);
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
LOG.info("Acknowledged :{}", metadata.toString());
}
@Override
public void close() {}
@Override
public void configure(Map configs) {}
}
- KafkaMessageSerializer.java
Код: Выделить всё
package com.ripus.kafkatutorials.application.config;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KafkaMessageSerializer extends StringSerializer {
private final static Logger LOG = LoggerFactory.getLogger(KafkaMessageSerializer.class);
private boolean isCompressionRequired;
public KafkaMessageSerializer(boolean isCompressionRequired) {
this.isCompressionRequired = isCompressionRequired;
}
@Override
public byte[] serialize(String topic, String data) {
if (isCompressionRequired) {
long startTime = System.currentTimeMillis();
byte[] compressedData = new byte[] {};
try (ByteArrayOutputStream bos = new ByteArrayOutputStream(data.length());
GZIPOutputStream gzips = new GZIPOutputStream(bos)) {
gzips.write(data.getBytes());
gzips.finish(); // Ensures all data is compressed
compressedData = bos.toByteArray();
LOG.info("Time Taken for Compression :: {} ms", System.currentTimeMillis() - startTime);
return compressedData;
} catch (IOException e) {
e.printStackTrace();
}
}
return super.serialize(topic, data);
}
}
- MessageServiceImpl.java
Код: Выделить всё
package com.ripus.kafkatutorials.application.service.impl;
import java.text.MessageFormat;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.http.HttpStatus;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ripus.kafkatutorials.application.bo.MessageRequest;
import com.ripus.kafkatutorials.application.service.MessageService;
@Service
public class MessageServiceImpl implements MessageService {
private final static Logger LOGGER = LoggerFactory.getLogger(MessageServiceImpl.class);
@Autowired
private Environment env;
@Autowired
private KafkaTemplate kafkaMsgTemplate;
@Override
public String postMessage(String request) throws InterruptedException, ExecutionException, JsonProcessingException {
LOGGER.info(MessageFormat.format("Message Sent {0}", request));
CompletableFuture result = kafkaMsgTemplate
.send(env.getProperty("kafka.messageservice.text.topic.name"), request);
LOGGER.info(MessageFormat.format("Message Sent {0}", result.get().toString()));
ObjectMapper mapper = new ObjectMapper();
String resultString = mapper.writeValueAsString(result.get().toString());
return resultString;
}
}
- MessageServiceConfig.java
Код: Выделить всё
package com.ripus.springfox.message.config;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.ripus.springfox.message.model.MessageRequest;
@Configuration
@EnableKafka
public class MessageServiceConfig {
@Autowired
private Environment env;
@Bean
ConcurrentKafkaListenerContainerFactory kafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(1);
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
ConsumerFactory
Подробнее здесь: [url]https://stackoverflow.com/questions/79195345/decompression-is-not-happening-after-upgrading-spring-boot-version-3-3-5[/url]
Мобильная версия