Как обнаружить ошибку десериализации в Kafka-Spring?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как обнаружить ошибку десериализации в Kafka-Spring?

Сообщение Anonymous »

Я запускаю приложение, использующее сообщения kafka.
Я следил за документацией Spring об обработке ошибок десериализации, чтобы перехватить исключение десериализации. Я попробовал метод errorDeserializationFunction.
Это мой класс конфигурации потребителя

Код: Выделить всё

@Bean
public Map consumerConfigs() {
Map consumerProps = new HashMap();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

/*  Error Handling */
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
consumerProps.put(ErrorHandlingDeserializer2.VALUE_FUNCTION, FailedNTCMessageBodyProvider.class);

return consumerProps;
}

@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs(), new StringDeserializer(),
new JsonDeserializer(NTCMessageBody.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());

return factory;
}
Это поставщик BiFunction

Код: Выделить всё

public class FailedNTCMessageBodyProvider implements BiFunction {

@Override
public NTCMessageBody apply(byte[] t, Headers u) {
return new NTCBadMessageBody(t);
}

}

public class NTCBadMessageBody extends NTCMessageBody{

private final byte[] failedDecode;

public NTCBadMessageBody(byte[] failedDecode) {
this.failedDecode = failedDecode;
}

public byte[] getFailedDecode() {
return this.failedDecode;
}

}

Когда я отправил только одно поврежденное сообщение по теме, я получил следующую ошибку (в цикле):
org.apache.kafka.common.errors .SerializationException: ошибка десериализации ключа/значения
Я понял, что ErrorHandlingDeserializer2 должен делегировать тип NTCBadMessageBody и продолжить использование. Я также видел (в режиме отладки), что он никогда не попадал в конструктор класса NTCBadMessageBody.

Подробнее здесь: https://stackoverflow.com/questions/559 ... fka-spring
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»