Мое требование — получить запись, сохранить эти записи на карте и вернуть эту карту другим методам, чтобы они могли обрабатывать и записывать окончательный вывод в файл.
Здесь добавляем метод запуска -
Код: Выделить всё
public Map run() throws NullPointerException {
kafkaConfigurations();
Map recordMap = new HashMap();
LOG.info("Started consuming kafka messages....");
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
int i = 0;
// int recordLimit = 1000; // Set the limit to the desired number of records
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofSeconds(5));
LOG.debug(records.count() + "^^^^^^^size of records");
/* if (records.isEmpty()) {
LOG.info("No more records to consume. Exiting...");
break;
}*/
for (ConsumerRecord record : records) {
JsonNode jsonData = null;
try {
jsonData = mapper.readTree(record.value());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
// Validate JSON data
if (!validateJsonData(jsonData)) {
continue; // Skip invalid records
}
SaleRecord priceRecords = mapper.convertValue(jsonData, SaleRecord.class);
if (priceRecords != null && isValidSaleRecord(priceRecords)) {
recordMap.put(priceRecords.getInvoiceId(), priceRecords);
i++;
}
}
LOG.info("Done consuming kafka messages after {} iterations", i);
return recordMap;
}
}
Я пытался обработать 1 лакх записей, и это работает с терминальным состоянием, равным 100 000.
но мне приходится обрабатывать 20 лакхов в день, поэтому это занимает время и превращается в бесконечный цикл.
Я не знаю иметь большой опыт работы с Кафкой. Если кто-нибудь знает о свойстве kafkaListner или о каком-либо простом способе это сделать, пожалуйста, помогите.
Подробнее здесь: https://stackoverflow.com/questions/787 ... on-records
Мобильная версия