Моя служба Kafka Streams требуется для перезагрузки всей темы Kafka при запуске, а затем подписки на любое новое сообщение, размещенное по этой теме. Магазины должны быть доступны везде, и поэтому я прочитал о GlobalKTables. Это очень хорошо подходит для моего варианта использования, поскольку я не могу указать случайный идентификатор группы для своего потребителя, а GlobalKTables, похоже, читает все для каждого запуска. Пока все хорошо.
Однако я, похоже, ограничен в заполнении более чем одного хранилища, используя одно и то же глобальное хранилище, и мне бы хотелось избежать необходимости читать каждое сообщение дважды или даже n-количество раз, где n — это количество магазинов, которое мне нужно для любой конкретной темы.
Я столкнулся с примером, подобным приведенному ниже, с двумя хранилищами и необходимостью вручную добавлять их в мои хранилища состояния. Глобальное хранилище ссылается только на myStoreBuilder, а не на оба из них. Это рекомендуемый подход или есть ли лучший способ сделать это? Мой GlobalProcessor в настоящее время извлекает оба хранилища и заполняет оба по моему желанию, но что-то подсказывает мне, что мой текущий подход является обходным путем. Возможно, я не предназначен для работы с двумя хранилищами в одном процессоре, но мне лучше вызвать addGlobalStore дважды — по одному разу для каждого из моих конструкторов хранилищ, а затем просто принять, что данные читаются дважды.
Код: Выделить всё
var myStoreBuilder = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("MY_STORE_NAME"),
Serdes.String(),
Serdes.String()
);
var myStoreBuilder2 = Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("MY_STORE_NAME2"),
Serdes.String(),
Serdes.String()
);
myStreamsBuilder().addStateStore(myStoreBuilder);
myStreamsBuilder().addStateStore(myStoreBuilder2);
myStreamsBuilder().addGlobalStore(
myStoreBuilder,
"my_topic",
Consumed.with(Serdes.String(), Serdes.String()),
new DataGlobalProcessorSupplier()
);
Подробнее здесь: https://stackoverflow.com/questions/798 ... obalktable
Мобильная версия