Kafka DefaultErrorHandler Перейти к текущему состоянию после исключенияJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Kafka DefaultErrorHandler Перейти к текущему состоянию после исключения

Сообщение Anonymous »

В моем потребителе весенней загрузки с использованием kafka я настроил DefaultErrorHandler на повтор неудачного события пару раз.
Теперь проблема в том, что после каждой повторной попытки я вижу stackTrace исключения, которое в идеале не должно регистрироваться (или, если мне что-то не хватает, укажите).
Мой заводской код:

Код: Выделить всё

    public  ConcurrentKafkaListenerContainerFactory createDlqContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
FixedBackOff fixedBackOff = new FixedBackOff(dlqRetryInterval, dlqMaxAttempts);
ConsumerRecordRecoverer recovery = (record, ex) -> {
log.error("Final retry failed for DLQ record: topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset(), ex);
};
factory.setCommonErrorHandler(new DefaultErrorHandler(recovery, fixedBackOff));
return factory;
}
В идеале мне нужен только журнал ошибок финальной повторной попытки, но вместо этого я получаю трассировку стека после каждой повторной попытки.

Код: Выделить всё

    2024-12-25 11:31:57,003 ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] , KafkaMessageListenerContainer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:227)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713)
at io.micrometer.observation.Observation.observe(Observation.java:565)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.transaction.processing.service.consumer.StockEventConsumer.consume(com.transaction.processing.service.model.eventmetadata.EventMetaData)' threw exception
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701)
...  10 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800)
Caused by: java.lang.RuntimeException: My Exception Message
Есть ли какое-нибудь решение, позволяющее мне справиться с этим

Подробнее здесь: https://stackoverflow.com/questions/793 ... -exception
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»