Как заполнить два магазина по одной теме с помощью Kafka Streams GlobalKTableJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как заполнить два магазина по одной теме с помощью Kafka Streams GlobalKTable

Сообщение Anonymous »

Я использую Kafka Streams и пытаюсь найти лучший способ обработки нескольких хранилищ, заполненных одними и теми же данными, без необходимости читать их дважды.

Моя служба Kafka Streams требуется для перезагрузки всей темы Kafka при запуске, а затем подписки на любое новое сообщение, размещенное по этой теме. Магазины должны быть доступны везде, и поэтому я прочитал о GlobalKTables. Это очень хорошо подходит для моего варианта использования, поскольку я не могу указать случайный идентификатор группы для своего потребителя, а GlobalKTables, похоже, читает все для каждого запуска. Пока все хорошо.
Однако я, похоже, ограничен в заполнении более чем одного хранилища, используя одно и то же глобальное хранилище, и мне бы хотелось избежать необходимости читать каждое сообщение дважды или даже n-количество раз, где n — это количество магазинов, которое мне нужно для любой конкретной темы.
Я столкнулся с примером, подобным приведенному ниже, с двумя хранилищами и необходимостью вручную добавлять их в мои хранилища состояния. Глобальное хранилище ссылается только на myStoreBuilder, а не на оба из них. Это рекомендуемый подход или есть ли лучший способ сделать это? Мой GlobalProcessor в настоящее время извлекает оба хранилища и заполняет оба, но что-то подсказывает мне, что это обходной путь, и, возможно, я не предназначен для работы с двумя хранилищами в одном процессоре.

Код: Выделить всё

var myStoreBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("MY_STORE_NAME"),
Serdes.String(),
Serdes.String()
);

var myStoreBuilder2 = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("MY_STORE_NAME2"),
Serdes.String(),
Serdes.String()
);

myStreamsBuilder().addStateStore(myStoreBuilder);
myStreamsBuilder().addStateStore(myStoreBuilder2);

myStreamsBuilder()().addGlobalStore(
myStoreBuilder,
"my_topic",
Consumed.with(Serdes.String(), Serdes.String()),
new DataGlobalProcessorSupplier()
);
Любая помощь приветствуется.


Подробнее здесь: https://stackoverflow.com/questions/798 ... obalktable
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»