Допустим, предположим, что я основал своего потребителя с автоматической коммиты, установленной для ложному, и потребителя начал слушать сообщения. /> Через некоторое время мой потребитель получит сообщения от 51 до 100, так как я не совершал их? Опрос сообщений в партии и совершать эти партии записей на основе моего результата обработки. < /P>
@KafkaListener(id="${listenerID}",topics = "${consumer.topic}", containerFactory = "listenerContainerFactory",autoStartup ="${isListenerEnabled}")
public void messageListener(List list,Consumer consumer) {
try {
Map offsetAndMetadataMap=processMessage(list, acknowledgment,consumer); //Here I've logic which validates batch of records and gives result for all the processed records.
consumer.commitSync(offsetAndMetadataMap);
}
catch (Exception e){
}
}
public ConcurrentKafkaListenerContainerFactory listenerContainerFactory()
{
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setAutoStartup(false);
factory.setConcurrency(consumerProperties.getConsumerThreads());
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000,2)));
if(!consumerProperties.isAutoCommit()) {
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
}
return factory;
}
public ConsumerFactory consumerFactory() {
Map properties = new HashMap();
try {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootStarpServer);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,autoCommit);
return new DefaultKafkaConsumerFactory(properties);
}
Подробнее здесь: https://stackoverflow.com/questions/784 ... s-with-aut
Как Kafka Slireeer RE доставляет сообщения, когда начинает потребление сообщений с помощью Auto Commit, установленного н ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
KAFKA Commit после того, как убедится, что все сообщения фактически обрабатывались
Anonymous » » в форуме C# - 0 Ответы
- 7 Просмотры
-
Последнее сообщение Anonymous
-