Не удалось создать экземпляр [org.apache.kafka.streams.KafkaStreams]: Фабричный метод «kafkaStreams» выдал исключение с сообщением: Неверная топология: образец темыТема уже зарегистрирована другим источником.
Код: Выделить всё
@Bean
public KafkaStreams kafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericRecord.class.getName());
Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
// GlobalKTable
GlobalKTable globalKTable = streamsBuilder.globalTable(sampleTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde)
);
// KStream
KStream kStream = streamsBuilder.stream(sampleTopic,
Consumed.with(Serdes.String(), valueGenericAvroSerde));
KStream joinedStream = kStream.leftJoin(
globalKTable,
(key, valueFromStream) -> key,
(valueFromStream, valueFromTable) -> {
if (valueFromTable == null) {
return valueFromStream;
} else {
return null;
}
}
);
processJoinedStream(joinedStream);
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration().asProperties());
streams.start();
return streams;
}
private void processJoinedStream(KStream joinedStream) {
joinedStream.foreach((key, value) -> {
List consumerRecords = Collections.singletonList(
new ConsumerRecord(key, 0, 0L, key, value));
dataProvider.consumeRecords(consumerRecords);
});
}
Будем очень признательны за любые идеи и предложения!
Подробнее здесь: https://stackoverflow.com/questions/790 ... ed-by-anot
Мобильная версия