- Мой слушатель обработал 50 сообщений из общего числа. опрашивается как 100, а затем только 50 были зафиксированы, а остальные с 51 по 100 не зафиксированы.
- И я начал обрабатывать следующий набор сообщений от 101 и зафиксирован до 200.
- Через какое-то время мой потребитель получит сообщения с 51 по 100, поскольку я их не фиксировал?
Я попробовал подход ниже. Опрос сообщений в пакетном режиме и фиксация этих пакетов записей на основе результатов моей обработки.
@KafkaListener(id="${listenerID}",topics = "${consumer.topic}", containerFactory = "listenerContainerFactory",autoStartup ="${isListenerEnabled}")
public void messageListener(List list,Consumer consumer) {
try {
Map offsetAndMetadataMap=processMessage(list, acknowledgment,consumer); //Here I've logic which validates batch of records and gives result for all the processed records.
consumer.commitSync(offsetAndMetadataMap);
}
catch (Exception e){
}
}
public ConcurrentKafkaListenerContainerFactory listenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setAutoStartup(false);
factory.setConcurrency(consumerProperties.getConsumerThreads());
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000,2)));
if(!consumerProperties.isAutoCommit()) {
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
}
return factory;
}
public ConsumerFactory consumerFactory() {
Map properties = new HashMap();
try {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStarpServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
return new DefaultKafkaConsumerFactory(properties);
}
Подробнее здесь: https://stackoverflow.com/questions/784 ... s-with-aut