В настоящее время я работаю над приложением Kafka Streams, где пытаюсь настроить GlobalKTable и KStream, используя одну и ту же тему (sampleTopic). Однако при попытке запустить приложение я сталкиваюсь со следующей ошибкой:
Не удалось создать экземпляр [org.apache.kafka.streams.KafkaStreams]: Фабричный метод «kafkaStreams» выдал исключение с сообщением: Неверная топология: образец темыTopic уже зарегистрирован другим источником.
Как решить проблему, продолжая использовать оба источника? GlobalKTable и KStream для одной и той же темы?
@Bean
public KafkaStreams kafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericRecord.class.getName());
Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
// GlobalKTable
GlobalKTable globalKTable = streamsBuilder.globalTable(sampleTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde)
);
// KStream
KStream kStream = streamsBuilder.stream(sampleTopic,
Consumed.with(Serdes.String(), valueGenericAvroSerde));
KStream joinedStream = kStream.leftJoin(
globalKTable,
(key, valueFromStream) -> key,
(valueFromStream, valueFromTable) -> {
if (valueFromTable == null) {
return valueFromStream;
} else {
return null;
}
}
);
processJoinedStream(joinedStream);
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration().asProperties());
streams.start();
return streams;
}
private void processJoinedStream(KStream joinedStream) {
joinedStream.foreach((key, value) -> {
List consumerRecords = Collections.singletonList(
new ConsumerRecord(key, 0, 0L, key, value));
dataProvider.consumeRecords(consumerRecords);
});
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... ed-by-anot
Ошибка Kafka Streams: неверная топология: тема уже зарегистрирована другим источником. ⇐ JAVA
Программисты JAVA общаются здесь
1727350117
Anonymous
В настоящее время я работаю над приложением Kafka Streams, где пытаюсь настроить GlobalKTable и KStream, используя одну и ту же тему (sampleTopic). Однако при попытке запустить приложение я сталкиваюсь со следующей ошибкой:
Не удалось создать экземпляр [org.apache.kafka.streams.KafkaStreams]: Фабричный метод «kafkaStreams» выдал исключение с сообщением: Неверная топология: образец темыTopic уже зарегистрирован другим источником.
Как решить проблему, продолжая использовать оба источника? GlobalKTable и KStream для одной и той же темы?
@Bean
public KafkaStreams kafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericRecord.class.getName());
Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
// GlobalKTable
GlobalKTable globalKTable = streamsBuilder.globalTable(sampleTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde)
);
// KStream
KStream kStream = streamsBuilder.stream(sampleTopic,
Consumed.with(Serdes.String(), valueGenericAvroSerde));
KStream joinedStream = kStream.leftJoin(
globalKTable,
(key, valueFromStream) -> key,
(valueFromStream, valueFromTable) -> {
if (valueFromTable == null) {
return valueFromStream;
} else {
return null;
}
}
);
processJoinedStream(joinedStream);
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration().asProperties());
streams.start();
return streams;
}
private void processJoinedStream(KStream joinedStream) {
joinedStream.foreach((key, value) -> {
List consumerRecords = Collections.singletonList(
new ConsumerRecord(key, 0, 0L, key, value));
dataProvider.consumeRecords(consumerRecords);
});
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79027027/kafka-streams-error-invalid-topology-topic-has-already-been-registered-by-anot[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия