Я следил за документацией Spring об обработке ошибок десериализации, чтобы перехватить исключение десериализации. Я попробовал метод errorDeserializationFunction.
Это мой класс конфигурации потребителя
Код: Выделить всё
@Bean
public Map consumerConfigs() {
Map consumerProps = new HashMap();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
/* Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);
return consumerProps;
}
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer(NTCMessageBody.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
Код: Выделить всё
public class FailedNTCMessageBodyProvider implements BiFunction {
@Override
public NTCMessageBody apply(byte[] t, Headers u) {
return new NTCBadMessageBody(t);
}
}
public class NTCBadMessageBody extends NTCMessageBody{
private final byte[] failedDecode;
public NTCBadMessageBody(byte[] failedDecode) {
this.failedDecode = failedDecode;
}
public byte[] getFailedDecode() {
return this.failedDecode;
}
}
org.apache.kafka.common.errors .SerializationException: ошибка десериализации ключа/значения
Я понял, что ErrorHandlingDeserializer2 должен делегировать тип NTCBadMessageBody и продолжить использование. Я также видел (в режиме отладки), что он никогда не попадал в конструктор класса NTCBadMessageBody.
Подробнее здесь: https://stackoverflow.com/questions/559 ... fka-spring
Мобильная версия