, чтобы JsonDeserializer мог правильно выполнять свою работу.
Если я отправлю недопустимое строковое содержимое в Kafka теме, то появляется следующая ошибка десериализации:
Код: Выделить всё
Error deserializing key/value for partition aaa.bbb.response-0 at offset 2.
If needed, please seek past the record to continue consumption.
Код: Выделить всё
@Bean
public ConsumerFactory consumerConfigs() {
Map configs = new HashMap();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
log.debug("kafka consumer bootstrap addresses: {}", bootstrapAddresses);
configs.forEach((key, value) -> log.debug("kafka consumer configuration: {\"{}\": \"{}\"", key, value));
return new DefaultKafkaConsumerFactory(configs);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerConfigs());
factory.setConcurrency(1);
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(3000);
factory.setCommonErrorHandler(new KafkaConsumerErrorHandler());
return factory;
}
Код: Выделить всё
@KafkaListener(
id = "response-topic-listener",
topics = "${app.kafka.topic.response}",
groupId = "response-group-1",
containerFactory = "kafkaListenerFactory")
public void listen(EventEnvelop message) {
log.info("new incoming message: {}", message);
}
Код: Выделить всё
public class KafkaConsumerErrorHandler implements CommonErrorHandler {
@Override
public boolean handleOne(Exception exception,
ConsumerRecord record,
Consumer consumer,
MessageListenerContainer container) {
return handle(exception, consumer);
}
@Override
public void handleOtherException(Exception exception,
Consumer consumer,
MessageListenerContainer container,
boolean batchListener) {
handle(exception, consumer);
}
private boolean handle(Exception exception, Consumer consumer) {
if (exception instanceof RecordDeserializationException e) {
log.debug("Incoming message: {}", getIncomingMessage(...); --> ??????
log.error("Unable to parse the incoming record. {}", e.getMessage());
consumer.seek(e.topicPartition(), e.offset() + 1L);
consumer.commitSync();
} else {
log.error("An unexpected error occurred while trying to handle the incoming message: ", exception);
}
return false;
}
}
+1
Мой код всегда вызывает handleOtherException, а метод handleOne никогда не вызывается.
+2
Для меня важен порядок сообщений, поэтому я использую setConcurrency(1) и setBatchListener(false) в моей конфигурации ConcurrentKafkaListenerContainerFactory.
Можно ли как-то получить исходное входящее сообщение? Как реализовать метод getIncomingMessage(...) в моем коде?
Подробнее здесь: https://stackoverflow.com/questions/791 ... ng-message
Мобильная версия