у нас есть потребительская логика, как показано ниже. Мы пытаемся обрабатывать несколько сообщений одновременно, с ограничением в одно сообщение на раздел.
void consumerMessage(){
while (!consumerService.isStopped(topic, server)) {
processBatch(consumer,connection,topic,server);
}}
}
private void processBatch(Consumer consumer, Connection connection, String topic,String server) throws Exception {
Instant pollStartTime = Instant.now();
Map topicPartitionMap = new HashMap();
Map partitionOffsetMap = new HashMap();
boolean isSuccess;
int messageCount = 0;
while (Duration.between(pollStartTime, Instant.now()).compareTo(Duration.ofMinutes(MAX_POLL_DURATION)) < 0 && MAX_BATCH_SIZE > messageCount) {
ConsumerRecords consumerRecords = consumer.poll(10000);
if (consumerRecords.isEmpty()) {
continue; // Skip the iteration if there are no records
}
List consumedPartition;
for (ConsumerRecord record : consumerRecords) {
if (topicPartitionMap.containsKey(record.topic()) && topicPartitionMap.get(record.topic()).contains(record.partition())) {
continue;
}
consumedPartition = topicPartitionMap.containsKey(record.topic()) ? topicPartitionMap.get(record.topic()):new ArrayList();
messages.add(objectMapper.readValue(record.value(), X.class));
messageCount++;
consumedPartition.add(record.partition());
topicPartitionMap.put(record.topic(),consumedPartition);
partitionOffsetMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1, "processed"));
log.info("partition {}, offset{} , topic {}",record.partition(),record.offset(),record.topic());
}
}
if (messageCount > 0) {
isSuccess= upload(messages,connection,topic,server);
if(isSuccess) {
log.info("Consumer commit for topic {} and size {}",topic,messages.size());
consumer.commitSync(partitionOffsetMap);
resetOffset(partitionOffsetMap,consumer,0);
}else{
resetOffset(partitionOffsetMap,consumer,-1);
}
} else {
log.info("No messages received in the last 1 minute.");
}
}
private void resetOffset(Map partitionOffsetMap, Consumer consumer,int offsetAddition){
Set partitions = partitionOffsetMap.keySet();
Map endOffsets = consumer.endOffsets(partitions);
for (Map.Entry entry : partitionOffsetMap.entrySet()) {
TopicPartition partition = entry.getKey();
OffsetAndMetadata minOffset = entry.getValue();
long newOffset = minOffset.offset() + offsetAddition;
long lastOffset = endOffsets.get(partition);
log.info("consumer manifest partition -{} seek to--- {} and end is {}",partition,newOffset,lastOffset);
if(lastOffset>=newOffset) consumer.seek(partition, newOffset);
}
}
Иногда после успешного использования сообщения указатель поиска перемещается к последнему сообщению, хотя журнал поиска печатается правильно в журналах.
Я делаю здесь что-то не так?
у нас есть потребительская логика, как показано ниже. Мы пытаемся обрабатывать несколько сообщений одновременно, с ограничением в одно сообщение на раздел. [code] void consumerMessage(){ while (!consumerService.isStopped(topic, server)) { processBatch(consumer,connection,topic,server); }}
}
private void processBatch(Consumer consumer, Connection connection, String topic,String server) throws Exception { Instant pollStartTime = Instant.now(); Map topicPartitionMap = new HashMap(); Map partitionOffsetMap = new HashMap(); boolean isSuccess; int messageCount = 0; while (Duration.between(pollStartTime, Instant.now()).compareTo(Duration.ofMinutes(MAX_POLL_DURATION)) < 0 && MAX_BATCH_SIZE > messageCount) { ConsumerRecords consumerRecords = consumer.poll(10000); if (consumerRecords.isEmpty()) { continue; // Skip the iteration if there are no records } List consumedPartition; for (ConsumerRecord record : consumerRecords) { if (topicPartitionMap.containsKey(record.topic()) && topicPartitionMap.get(record.topic()).contains(record.partition())) { continue; } consumedPartition = topicPartitionMap.containsKey(record.topic()) ? topicPartitionMap.get(record.topic()):new ArrayList(); messages.add(objectMapper.readValue(record.value(), X.class)); messageCount++; consumedPartition.add(record.partition()); topicPartitionMap.put(record.topic(),consumedPartition); partitionOffsetMap.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "processed")); log.info("partition {}, offset{} , topic {}",record.partition(),record.offset(),record.topic());
} } if (messageCount > 0) { isSuccess= upload(messages,connection,topic,server); if(isSuccess) { log.info("Consumer commit for topic {} and size {}",topic,messages.size()); consumer.commitSync(partitionOffsetMap); resetOffset(partitionOffsetMap,consumer,0); }else{ resetOffset(partitionOffsetMap,consumer,-1); } } else { log.info("No messages received in the last 1 minute."); } }
for (Map.Entry entry : partitionOffsetMap.entrySet()) { TopicPartition partition = entry.getKey(); OffsetAndMetadata minOffset = entry.getValue(); long newOffset = minOffset.offset() + offsetAddition; long lastOffset = endOffsets.get(partition); log.info("consumer manifest partition -{} seek to--- {} and end is {}",partition,newOffset,lastOffset); if(lastOffset>=newOffset) consumer.seek(partition, newOffset); } } [/code] Иногда после успешного использования сообщения указатель поиска перемещается к последнему сообщению, хотя журнал поиска печатается правильно в журналах. Я делаю здесь что-то не так?