Два класса конфигурации: один для GlobalKTable и один для KStream.
Мне нужно инициализировать KStream после полной загрузки GlobalKTable.
Вот класс GlobalKTableConfig, в котором я хочу получить доступ к заголовкам:
Код: Выделить всё
@Bean(name = "createGlobalKTable")
public GlobalKTable createGlobalKTable(@Qualifier("globalKTableStreamsBuilder") StreamsBuilderFactoryBean globalKTableStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder streamsBuilder = globalKTableStreamsBuilderFactoryBean.getObject();
Map serdeConfig = new HashMap();
serdeConfig.put("schema.registry.url", schemaRegistryUrl);
serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
return streamsBuilder.globalTable(ediLegTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde));
}
Код: Выделить всё
@Bean
public StreamsBuilderFactoryBean kStreamBuilder(
@Qualifier("kafkaStreamsConfig") KafkaStreamsConfiguration config,
GlobalKTableService globalKTableService) {
StreamsBuilderFactoryBean factoryBean = new StreamsBuilderFactoryBean(config, new CleanupConfig(false, false));
factoryBean.setKafkaStreamsCustomizer(kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
log.info("State changed from " + oldState + " to " + newState);
if (newState == KafkaStreams.State.RUNNING) {
KeyValueIterator keyValueIterator = globalKTableService.getAllValuesFromGlobalKTable();
// process each record regarding the logic
}
});
});
return factoryBean;
}
@Bean
public KStream createKStream(
@Qualifier("kStreamBuilder") StreamsBuilderFactoryBean kStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder kStreamBuilder = kStreamsBuilderFactoryBean.getObject();
Map serdeConfig = new HashMap();
serdeConfig.put("schema.registry.url", schemaRegistryUrl);
serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
KStream kStream = kStreamBuilder.stream(ediLegTopic, Consumed.with(Serdes.String(), valueGenericAvroSerde));
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName + "-global"),
Serdes.String(),
valueGenericAvroSerde
);
kStreamBuilder.addStateStore(storeBuilder);
KStream processedStream = kStream.process(() -> new CustomProcessor(globalKTableService, logicProvider), storeName + "-global");
return processedStream;
}
}
Код: Выделить всё
public class CustomProcessor implements Processor {
private ProcessorContext context;
private final GlobalKTableService globalKTableService;
private final LogicProvider logicProvider;
public CustomProcessor(GlobalKTableService globalKTableService, LogicProvider logicProvider) {
this.globalKTableService = globalKTableService;
this.logicProvider= logicProvider;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Record processingRecord) {
String key = processingRecord.key();
GenericRecord value = processingRecord.value();
Headers headers = processingRecord.headers();
GenericRecord globalTableValue = globalKTableService.getValueFromGlobalKTable(key);
if ((!value.equals(globalTableValue))) {
log.info("No match in GlobalKTable. Forwarding stream record with key: {}", key);
logicProvider.applyLogic(new KeyValue(key, value), headers);
context.forward(processingRecord.withValue(value));
} else {
log.info("Match found in GlobalKTable for key: {}. Skipping forward.", key);
}
}
@Override
public void close() {
log.info("Closing CustomProcessor");
}
}
Подробнее здесь: https://stackoverflow.com/questions/790 ... ka-streams
Мобильная версия