Ошибка Kafka Streams: неверная топология: тема уже зарегистрирована другим источником.JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Ошибка Kafka Streams: неверная топология: тема уже зарегистрирована другим источником.

Сообщение Anonymous »

В настоящее время я работаю над приложением Kafka Streams, где пытаюсь настроить GlobalKTable и KStream, используя одну и ту же тему (sampleTopic). Однако при попытке запустить приложение я сталкиваюсь со следующей ошибкой:

Не удалось создать экземпляр [org.apache.kafka.streams.KafkaStreams]: Фабричный метод «kafkaStreams» выдал исключение с сообщением: Неверная топология: образец темыТема уже зарегистрирована другим источником.

Код: Выделить всё

@Bean
public KafkaStreams kafkaStreams(@Qualifier("streamsBuilder") StreamsBuilder streamsBuilder) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericRecord.class.getName());

Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);

// GlobalKTable
GlobalKTable globalKTable = streamsBuilder.globalTable(sampleTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde)
);

// KStream
KStream kStream = streamsBuilder.stream(sampleTopic,
Consumed.with(Serdes.String(), valueGenericAvroSerde));

KStream joinedStream = kStream.leftJoin(
globalKTable,
(key, valueFromStream) -> key,
(valueFromStream, valueFromTable) -> {
if (valueFromTable == null) {
return valueFromStream;
} else {
return null;
}
}
);

processJoinedStream(joinedStream);

KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), kafkaStreamsConfiguration().asProperties());
streams.start();

return streams;
}

private void processJoinedStream(KStream joinedStream) {
joinedStream.foreach((key, value) -> {
List consumerRecords = Collections.singletonList(
new ConsumerRecord(key, 0, 0L, key, value));
dataProvider.consumeRecords(consumerRecords);
});
}
Как решить проблему, продолжая использовать GlobalKTable и KStream для одной и той же темы?
Будем очень признательны за любые идеи и предложения!

Подробнее здесь: https://stackoverflow.com/questions/790 ... ed-by-anot
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»