Потребитель требует промежуточного сброса до последней версииJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Потребитель требует промежуточного сброса до последней версии

Сообщение Anonymous »

у нас есть потребительская логика, как показано ниже. Мы пытаемся обрабатывать несколько сообщений одновременно, с ограничением в одно сообщение на раздел.

Код: Выделить всё

 void consumerMessage(){
while (!consumerService.isStopped(topic, server)) {
processBatch(consumer,connection,topic,server);
}}

}

private void processBatch(Consumer consumer, Connection connection, String topic,String server) throws Exception {
Instant pollStartTime = Instant.now();
Map topicPartitionMap = new HashMap();
Map partitionOffsetMap = new HashMap();
boolean isSuccess;
int messageCount = 0;
while (Duration.between(pollStartTime, Instant.now()).compareTo(Duration.ofMinutes(MAX_POLL_DURATION)) < 0 && MAX_BATCH_SIZE > messageCount) {
ConsumerRecords consumerRecords = consumer.poll(10000);
if (consumerRecords.isEmpty()) {
continue; // Skip the iteration if there are no records
}
List consumedPartition;
for (ConsumerRecord record : consumerRecords) {
if (topicPartitionMap.containsKey(record.topic()) && topicPartitionMap.get(record.topic()).contains(record.partition())) {
continue;
}
consumedPartition = topicPartitionMap.containsKey(record.topic()) ? topicPartitionMap.get(record.topic()):new ArrayList();
messages.add(objectMapper.readValue(record.value(), X.class));
messageCount++;
consumedPartition.add(record.partition());
topicPartitionMap.put(record.topic(),consumedPartition);
partitionOffsetMap.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1, "processed"));
log.info("partition {}, offset{} , topic {}",record.partition(),record.offset(),record.topic());

}
}
if (messageCount > 0) {
isSuccess= upload(messages,connection,topic,server);
if(isSuccess) {
log.info("Consumer commit for topic  {} and size {}",topic,messages.size());
consumer.commitSync(partitionOffsetMap);
resetOffset(partitionOffsetMap,consumer,0);
}else{
resetOffset(partitionOffsetMap,consumer,-1);
}
} else {
log.info("No messages received in the last 1 minute.");
}
}

private void resetOffset(Map partitionOffsetMap, Consumer consumer,int offsetAddition){
Set partitions = partitionOffsetMap.keySet();
Map endOffsets = consumer.endOffsets(partitions);

for (Map.Entry entry : partitionOffsetMap.entrySet()) {
TopicPartition partition = entry.getKey();
OffsetAndMetadata minOffset = entry.getValue();
long newOffset = minOffset.offset() + offsetAddition;
long lastOffset = endOffsets.get(partition);
log.info("consumer manifest partition -{} seek to--- {} and end is {}",partition,newOffset,lastOffset);
if(lastOffset>=newOffset) consumer.seek(partition, newOffset);
}
}
Иногда после успешного использования сообщения указатель поиска перемещается к последнему сообщению, хотя журнал поиска печатается правильно в журналах.
Я делаю здесь что-то не так?

Подробнее здесь: https://stackoverflow.com/questions/791 ... rmediately
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»