Как прослушиватель Kafka повторно доставляет сообщения, когда начинает получать сообщения с автоматической фиксацией, усJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как прослушиватель Kafka повторно доставляет сообщения, когда начинает получать сообщения с автоматической фиксацией, ус

Сообщение Anonymous »

Предположим, я запустил своего потребителя с автоматической фиксацией, установленной на false, и потребитель начал прослушивать сообщения.
  • Мой слушатель обработал 50 сообщений из общего числа. опрашивается как 100, а затем только 50 были зафиксированы, а остальные с 51 по 100 не зафиксированы.
  • И я начал обрабатывать следующий набор сообщений от 101 и зафиксирован до 200.
  • Через какое-то время мой потребитель получит сообщения с 51 по 100, поскольку я их не фиксировал?
Как будет вести себя мой потребитель Kafka с этим вариантом использования??
Я попробовал подход ниже. Опрос сообщений в пакетном режиме и фиксация этих пакетов записей на основе результатов моей обработки.
@KafkaListener(id="${listenerID}",topics = "${consumer.topic}", containerFactory = "listenerContainerFactory",autoStartup ="${isListenerEnabled}")
public void messageListener(List list,Consumer consumer) {
try {
Map offsetAndMetadataMap=processMessage(list, acknowledgment,consumer); //Here I've logic which validates batch of records and gives result for all the processed records.
consumer.commitSync(offsetAndMetadataMap);
}
catch (Exception e){
}
}

public ConcurrentKafkaListenerContainerFactory listenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setAutoStartup(false);
factory.setConcurrency(consumerProperties.getConsumerThreads());
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000,2)));

if(!consumerProperties.isAutoCommit()) {
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
}
return factory;
}

public ConsumerFactory consumerFactory() {
Map properties = new HashMap();
try {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStarpServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);

return new DefaultKafkaConsumerFactory(properties);
}


Подробнее здесь: https://stackoverflow.com/questions/784 ... s-with-aut
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»