Вызвано: java.lang.IllegalArgumentException: Имя задания должно быть указывается в случае нескольких заданий
GlobalKTableConfig: создает GlobalKTable с использованием Kafka Streams:
Код: Выделить всё
@Slf4j
@Configuration
public class GlobalKTableConfig {
@Value("${spring.kafka.streams.properties.store-name}")
private String storeName;
@Value("${spring.kafka.streams.properties.schema.registry.url-config}")
private String schemaRegistryUrl;
@Value("${kafka.topic.name}")
private String topic;
@Value("${global-kafka-streams.application-id}")
private String globalTableAppId;
@Value("${spring.kafka.streams.bootstrap-servers}")
private String bootstrapServers;
@Bean(name = "globalKTableStreamsConfig")
public KafkaStreamsConfiguration globalKTableStreamsConfig() {
Map props = new HashMap();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, globalTableAppId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "globalKTableStreamsBuilder")
public StreamsBuilderFactoryBean globalKTableStreamsBuilder(
@Qualifier("globalKTableStreamsConfig") KafkaStreamsConfiguration globalKTableStreamsConfig) {
return new StreamsBuilderFactoryBean(globalKTableStreamsConfig, new CleanupConfig(true, true));
}
@Bean
public GlobalKTable createGlobalKTable(
@Qualifier("globalKTableStreamsBuilder") StreamsBuilderFactoryBean globalKTableStreamsBuilderFactoryBean) throws Exception {
StreamsBuilder streamsBuilder = globalKTableStreamsBuilderFactoryBean.getObject();//5
Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
return streamsBuilder.globalTable(ediLegTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde));
}
}
Код: Выделить всё
@Configuration
@Slf4j
@DependsOn("createGlobalKTable")
public class KStreamsConfig {
@Value("${kafka.topic.name}")
private String topic;
@Value("${spring.kafka.streams.application-id}")
private String applicationId;
@Value("${spring.kafka.streams.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.streams.properties.schema.registry.url-config}")
private String schemaRegistryUrl;
@Value("${kstream-kafka-streams.application-id}")
private String kStreamAppId;
@Value("${spring.kafka.streams.properties.store-name}")
private String storeName;
@Bean(name = "kafkaStreamsConfig")
public KafkaStreamsConfiguration kafkaStreamsConfig() {
Map props = new HashMap();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamAppId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new KafkaStreamsConfiguration(props);
}
@Bean(name = "kStreamBuilder")
public StreamsBuilderFactoryBean kStreamBuilder(
@Qualifier("kafkaStreamsConfig") KafkaStreamsConfiguration config) {
return new StreamsBuilderFactoryBean(config, new CleanupConfig(true, true));
}
@Bean
public KStream createKStream(@Qualifier("kStreamBuilder") StreamsBuilder kStreamBuilder,
@Qualifier("createGlobalKTable") GlobalKTable globalKTable) {
Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);
KStream kStream = kStreamBuilder.stream(ediLegTopic, Consumed.with(Serdes.String(), valueGenericAvroSerde));
KStream joinedStream = kStream.leftJoin(globalKTable,
(key, value) -> key,
(streamValue, tableValue) -> {
if (tableValue != null) {
return null;
}
return streamValue;
});
processJoinedStream(joinedStream);
return joinedStream;
}
private void processJoinedStream(KStream joinedStream) {
joinedStream.foreach((key, value) -> {
// processing logic here
});
}
}
Код: Выделить всё
@Slf4j
@Configuration
public class syncBatchConfiguration {
public syncBatchConfiguration(
// configuration
}
@Bean
public ItemReader itemReader(
}
@Bean
public ItemProcessor itemProcessor() {
}
@Bean
public ItemWriter itemWriter() {
RepositoryItemWriter itemWriter = new RepositoryItemWriter();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
log.info("Creating ThreadPoolTaskExecutor...");
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
return taskExecutor;
}
@Bean
protected Step process(JobRepository jobRepository,
//implement step
}
@Bean
public Job processJob(JobRepository jobRepository, Step processStep) {
//implement job
}
@Bean
public TaskExecutorJobLauncher asyncJobLauncher(JobRepository jobRepository) throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
Я хочу, чтобы Kafka Streams (KStream и GlobalKTable) корректно работал вместе с заданием Spring Batch без конфликтов. Как я могу разрешить этот конфликт и предотвратить вмешательство Spring Batch в Kafka Stream.
Подробнее здесь: https://stackoverflow.com/questions/790 ... -despite-s