Запретить Kafka Streams Consumer писать смещения / дождаться, пока один поток израсходует все записи, прежде чем запускаJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Запретить Kafka Streams Consumer писать смещения / дождаться, пока один поток израсходует все записи, прежде чем запуска

Сообщение Anonymous »

Это вопрос «2 в 1», касающийся потоков.
Я работаю над сервисом, состоящим из двух потоков. Один (первый) должен использовать всю тему, получая пары ключ/значение и сохраняя их информацию в локальном HashMap
Как только в этом потоке больше нет задержки, запускается второй поток и поглощает другую тему. На основе использованных данных и одного из их атрибутов на основе записей HashMap решается, будет ли запись удалена или обработана дальше.
Поэтому первый поток должен достичь конца своей темы до того, как начнется второй поток.Я бы хотел, чтобы служба не сохраняла состояние и, таким образом, не сохраняла данные в хранилище состояний. Это создает две проблемы:
  • Обычный потребитель Kafka может быть настроен так, чтобы он не записывал свои смещения и, таким образом, использовал тему снова и снова при каждом перезапуске. .

Код: Выделить всё

enable.auto.commit = false
auto.offset.reset = earliest
Однако с потоками это не работает. Мое временное решение — сгенерировать ApplicationID со случайной частью, чтобы игнорировать ранее записанные смещения.
Это создает новую группу потребителей для каждого экземпляра, в результате чего в брокере появляется множество групп.
-> Есть ли способ настроить потоковый клиент так, чтобы он НЕ записывал смещения?
  • Чтобы второй поток подождите, пока будет израсходована первая тема, я пробовал разные реализации. Мой последний подход выглядит следующим образом:

Код: Выделить всё

      try {
firstStreams.start();
// Waiting for consumer to start...
while (computeLag(firstStreams.metrics().entrySet()) < 1) {
sleep(100);
}
// Waiting for Lag to reach 0
while (computeLag(firstStreams.metrics().entrySet()) > 1) {
sleep(100);
}

secondStream.start();
shutdownLatch.await();
} catch (Throwable e) {
System.exit(1);
}
}
System.exit(0);
}

private static double computeLag(Set

Подробнее здесь: [url]https://stackoverflow.com/questions/76924206/prevent-kafka-streams-consumer-from-writing-offsets-wait-for-one-stream-to-con[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»