Я работаю над сервисом, состоящим из двух потоков. Один (первый) должен использовать всю тему, получая пары ключ/значение и сохраняя их информацию в локальном HashMap
Как только в этом потоке больше нет задержки, запускается второй поток и поглощает другую тему. На основе использованных данных и одного из их атрибутов на основе записей HashMap решается, будет ли запись удалена или обработана дальше.
Поэтому первый поток должен достичь конца своей темы до того, как начнется второй поток.Я бы хотел, чтобы служба не сохраняла состояние и, таким образом, не сохраняла данные в хранилище состояний. Это создает две проблемы:
- Обычный потребитель Kafka может быть настроен так, чтобы он не записывал свои смещения и, таким образом, использовал тему снова и снова при каждом перезапуске. .
Код: Выделить всё
enable.auto.commit = false
auto.offset.reset = earliest
Это создает новую группу потребителей для каждого экземпляра, в результате чего в брокере появляется множество групп.
-> Есть ли способ настроить потоковый клиент так, чтобы он НЕ записывал смещения?
- Чтобы второй поток подождите, пока будет израсходована первая тема, я пробовал разные реализации. Мой последний подход выглядит следующим образом:
Код: Выделить всё
try {
firstStreams.start();
// Waiting for consumer to start...
while (computeLag(firstStreams.metrics().entrySet()) < 1) {
sleep(100);
}
// Waiting for Lag to reach 0
while (computeLag(firstStreams.metrics().entrySet()) > 1) {
sleep(100);
}
secondStream.start();
shutdownLatch.await();
} catch (Throwable e) {
System.exit(1);
}
}
System.exit(0);
}
private static double computeLag(Set
Подробнее здесь: [url]https://stackoverflow.com/questions/76924206/prevent-kafka-streams-consumer-from-writing-offsets-wait-for-one-stream-to-con[/url]