Теперь проблема в том, что после каждой повторной попытки я вижу stackTrace исключения, которое в идеале не должно регистрироваться (или, если мне что-то не хватает, укажите).
Мой заводской код:
Код: Выделить всё
public ConcurrentKafkaListenerContainerFactory createDlqContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
FixedBackOff fixedBackOff = new FixedBackOff(dlqRetryInterval, dlqMaxAttempts);
ConsumerRecordRecoverer recovery = (record, ex) -> {
log.error("Final retry failed for DLQ record: topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset(), ex);
};
factory.setCommonErrorHandler(new DefaultErrorHandler(recovery, fixedBackOff));
return factory;
}
Код: Выделить всё
2024-12-25 11:31:57,003 ERROR[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] , KafkaMessageListenerContainer - Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:227)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713)
at io.micrometer.observation.Observation.observe(Observation.java:565)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.transaction.processing.service.consumer.StockEventConsumer.consume(com.transaction.processing.service.model.eventmetadata.EventMetaData)' threw exception
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701)
... 10 common frames omitted
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:435)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800)
Caused by: java.lang.RuntimeException: My Exception Message
Подробнее здесь: https://stackoverflow.com/questions/793 ... -exception
Мобильная версия