Как получить доступ к заголовкам в GlobalKTable в Kafka StreamsJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как получить доступ к заголовкам в GlobalKTable в Kafka Streams

Сообщение Anonymous »

Я пытаюсь использовать GlobalKTable для хранения данных из темы Kafka во время запуска моего приложения. И GlobalKTable, и KStream используют одну и ту же тему, мне нужен доступ к заголовкам в GlobalKTable для обработки. После начальной загрузки данных мой KStream будет обрабатывать записи в реальном времени. эти записи в реальном времени могут быть новыми записями или существовать в GlobalKTable. Проблема в том, что я не могу получить доступ к заголовкам Kafka в конфигурации GlobalKTable. Вот краткое изложение моей настройки:
Два класса конфигурации: один для GlobalKTable и один для KStream.
Мне нужно инициализировать KStream после полной загрузки GlobalKTable.
Вот класс GlobalKTableConfig, в котором я хочу получить доступ к заголовкам:

Код: Выделить всё

@Bean(name = "createGlobalKTable")
public GlobalKTable createGlobalKTable(@Qualifier("globalKTableStreamsBuilder") StreamsBuilderFactoryBean globalKTableStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder streamsBuilder = globalKTableStreamsBuilderFactoryBean.getObject();
Map serdeConfig = new HashMap();
serdeConfig.put("schema.registry.url", schemaRegistryUrl);
serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);

final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);

return streamsBuilder.globalTable(ediLegTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde));

}
это класс KstreamConfig:

Код: Выделить всё

    @Bean
public StreamsBuilderFactoryBean kStreamBuilder(
@Qualifier("kafkaStreamsConfig") KafkaStreamsConfiguration config,
GlobalKTableService globalKTableService) {
StreamsBuilderFactoryBean factoryBean = new StreamsBuilderFactoryBean(config, new CleanupConfig(false, false));
factoryBean.setKafkaStreamsCustomizer(kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
log.info("State changed from " + oldState + " to " + newState);
if (newState == KafkaStreams.State.RUNNING) {
KeyValueIterator keyValueIterator = globalKTableService.getAllValuesFromGlobalKTable();
// process each record regarding the logic
}
});
});
return factoryBean;
}

@Bean
public KStream createKStream(
@Qualifier("kStreamBuilder") StreamsBuilderFactoryBean kStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder kStreamBuilder = kStreamsBuilderFactoryBean.getObject();

Map serdeConfig = new HashMap();
serdeConfig.put("schema.registry.url", schemaRegistryUrl);
serdeConfig.put("schema.registry.basic.auth.user.info", basicAuthUserInfo);
serdeConfig.put("schema.registry.basic.auth.credentials.source", basicAuthCredentialsSource);

final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
KStream kStream = kStreamBuilder.stream(ediLegTopic, Consumed.with(Serdes.String(), valueGenericAvroSerde));

StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName + "-global"),
Serdes.String(),
valueGenericAvroSerde
);
kStreamBuilder.addStateStore(storeBuilder);

KStream processedStream = kStream.process(() -> new CustomProcessor(globalKTableService, logicProvider), storeName + "-global");
return processedStream;
}
}
У меня есть процессор, который применяет логику к каждой записи в реальном времени:

Код: Выделить всё

public class CustomProcessor implements Processor {
private ProcessorContext context;
private final GlobalKTableService globalKTableService;
private final LogicProvider logicProvider;

public CustomProcessor(GlobalKTableService globalKTableService, LogicProvider logicProvider) {
this.globalKTableService = globalKTableService;
this.logicProvider= logicProvider;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
}

@Override
public void process(Record  processingRecord) {
String key = processingRecord.key();
GenericRecord value = processingRecord.value();
Headers headers = processingRecord.headers();

GenericRecord globalTableValue = globalKTableService.getValueFromGlobalKTable(key);

if ((!value.equals(globalTableValue))) {
log.info("No match in GlobalKTable. Forwarding stream record with key: {}", key);
logicProvider.applyLogic(new KeyValue(key, value), headers);
context.forward(processingRecord.withValue(value));
} else {
log.info("Match found in GlobalKTable for key: {}. Skipping forward.", key);
}
}

@Override
public void close() {
log.info("Closing CustomProcessor");
}
}
Как я могу получить доступ к заголовкам Kafka в конфигурации GlobalKTable?
Эта ссылка Kafka Streams GlobalKTable и доступ к заголовкам записей имеет соответствующую тему, но я не могу понять, что это значит .

Подробнее здесь: https://stackoverflow.com/questions/790 ... ka-streams
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»