DefaultErrorHandler для обработки исключения таймаута для потребителя KafkaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 DefaultErrorHandler для обработки исключения таймаута для потребителя Kafka

Сообщение Anonymous »

Я столкнулся с исключением тайм-аута по разным причинам с моим потребителем Spring Kafka. Я использую следующие конфигурации для определения моей потребительской фабрики с обработчиком ошибок.

Код: Выделить всё

@Bean
public ConcurrentKafkaListenerContainerFactory createListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(createErrorHandler());
return factory;
}

private ConsumerFactory consumerFactory() {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);

return new DefaultKafkaConsumerFactory(props);
}

public DefaultErrorHandler createErrorHandler() {
BackOff fixedBackOff = new FixedBackOff(3000, 0);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
log.info("Exception occured: {}", exception);
}, fixedBackOff);
errorHandler.addNotRetryableExceptions(UnknownTopicOrPartitionException.class,
OffsetOutOfRangeException.class, NetworkException.class,
SerializationException.class, DeserializationException.class);
return errorHandler;
}
Однако, несмотря на наличие обработчика ошибок, в котором я хочу эффективно обрабатывать тайм-аут, отправляя электронную почту или что-то еще, в моих журналах я получаю эту ошибку.

Код: Выделить всё

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
Вопросы:
  • Является ли TimeoutException исключением повторной попытки? Безопасно ли повторять их?
  • Можно ли обработать указанные выше исключения с помощью обработчика ошибок?
  • Если да, то как эффективно обрабатывать эти исключения с помощью обработчика ошибок (SeekToCurrentErrorHandler или DefaultErrorHandler)?
  • Почему Kafka определяет определенное исключение как предупреждение, а не как ошибку, как показано ниже

    Код: Выделить всё

    2024-06-25T21:42:31.693+05:30  WARN 22880 --- [retry-kafka] [  test-id-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Error while fetching metadata with correlation id 263 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION} 
Я нашел здесь аналогичный вопрос, но это часть производителя Kafka, я ожидаю, что обрабатывать эти исключения в обработчике ошибок
  • Можно ли иметь обработчик ошибок, позволяющий производителю повторять отправку сообщения при определенных исключениях?
Я пытался определить исключения в обработчике ошибок, но TimeoutException или UnknownTopicOrPartitionException не отправляются в обработчик ошибок, а просто регистрируются в моей консоли.
Я использую Java 19, Kafka 3.7.0, Spring-Kafka 3.1.5
несколько других ссылок, которые я пробовал: 1 2

Подробнее здесь: https://stackoverflow.com/questions/786 ... a-consumer
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»