Исключение десериализации ключа/значения Kafka: запишите исходное входящее сообщениеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Исключение десериализации ключа/значения Kafka: запишите исходное входящее сообщение

Сообщение Anonymous »

Я изучаю, как Kafka обрабатывает исключения. У меня есть потребитель сообщений, который ожидает входящие данные JSON
, чтобы JsonDeserializer мог правильно выполнять свою работу.
Если я отправлю недопустимое строковое содержимое в Kafka теме, то появляется следующая ошибка десериализации:

Код: Выделить всё

Error deserializing key/value for partition aaa.bbb.response-0 at offset 2.
If needed, please seek past the record to continue consumption.
Это здорово. Я ловлю это и записываю следующим образом:

Код: Выделить всё

@Bean
public ConsumerFactory consumerConfigs() {
Map configs = new HashMap();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddresses);
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

log.debug("kafka consumer bootstrap addresses: {}", bootstrapAddresses);
configs.forEach((key, value) -> log.debug("kafka consumer configuration: {\"{}\": \"{}\"", key, value));

return new DefaultKafkaConsumerFactory(configs);
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerFactory() {

ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerConfigs());
factory.setConcurrency(1);
factory.setBatchListener(false);
factory.getContainerProperties().setPollTimeout(3000);
factory.setCommonErrorHandler(new KafkaConsumerErrorHandler());
return factory;
}
Мой потребитель сообщений выглядит следующим образом:

Код: Выделить всё

@KafkaListener(
id = "response-topic-listener",
topics = "${app.kafka.topic.response}",
groupId = "response-group-1",
containerFactory = "kafkaListenerFactory")
public void listen(EventEnvelop message) {
log.info("new incoming message: {}", message);
}
Вот как я обнаруживаю ошибку сериализации:

Код: Выделить всё

public class KafkaConsumerErrorHandler implements CommonErrorHandler {

@Override
public boolean handleOne(Exception exception,
ConsumerRecord record,
Consumer consumer,
MessageListenerContainer container) {
return handle(exception, consumer);
}

@Override
public void handleOtherException(Exception exception,
Consumer consumer,
MessageListenerContainer container,
boolean batchListener) {
handle(exception, consumer);
}

private boolean handle(Exception exception, Consumer consumer) {
if (exception instanceof RecordDeserializationException e) {
log.debug("Incoming message: {}", getIncomingMessage(...); --> ??????
log.error("Unable to parse the incoming record. {}", e.getMessage());
consumer.seek(e.topicPartition(), e.offset() + 1L);
consumer.commitSync();
} else {
log.error("An unexpected error occurred while trying to handle the incoming message: ", exception);
}
return false;
}
}
Я хочу зарегистрировать исходное входящее сообщение, которое синтаксическому анализатору не удалось проанализировать. Я пытался найти это, а также исследовать объекты-потребители и контейнеры в режиме отладки, но безуспешно.
+1
Мой код всегда вызывает handleOtherException, а метод handleOne никогда не вызывается.
+2
Для меня важен порядок сообщений, поэтому я использую setConcurrency(1) и setBatchListener(false) в моей конфигурации ConcurrentKafkaListenerContainerFactory.
Можно ли как-то получить исходное входящее сообщение? Как реализовать метод getIncomingMessage(...) в моем коде?

Подробнее здесь: https://stackoverflow.com/questions/791 ... ng-message
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»