Код: Выделить всё
@Bean
public ConcurrentKafkaListenerContainerFactory createListenerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(createErrorHandler());
return factory;
}
private ConsumerFactory consumerFactory() {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 100);
return new DefaultKafkaConsumerFactory(props);
}
public DefaultErrorHandler createErrorHandler() {
BackOff fixedBackOff = new FixedBackOff(3000, 0);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((consumerRecord, exception) -> {
log.info("Exception occured: {}", exception);
}, fixedBackOff);
errorHandler.addNotRetryableExceptions(UnknownTopicOrPartitionException.class,
OffsetOutOfRangeException.class, NetworkException.class,
SerializationException.class, DeserializationException.class);
return errorHandler;
}
Код: Выделить всё
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
- Является ли TimeoutException исключением повторной попытки? Безопасно ли повторять их?
- Можно ли обработать указанные выше исключения с помощью обработчика ошибок?
- Если да, то как эффективно обрабатывать эти исключения с помощью обработчика ошибок (SeekToCurrentErrorHandler или DefaultErrorHandler)?
- Почему Kafka определяет определенное исключение как предупреждение, а не как ошибку, как показано ниже
Код: Выделить всё
2024-06-25T21:42:31.693+05:30 WARN 22880 --- [retry-kafka] [ test-id-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-group-id-1, groupId=test-group-id] Error while fetching metadata with correlation id 263 : {my-topic=UNKNOWN_TOPIC_OR_PARTITION}
- Можно ли иметь обработчик ошибок, позволяющий производителю повторять отправку сообщения при определенных исключениях?
Я использую Java 19, Kafka 3.7.0, Spring-Kafka 3.1.5
несколько других ссылок, которые я пробовал: 1 2
Подробнее здесь: https://stackoverflow.com/questions/786 ... a-consumer
Мобильная версия