Kafka Streams и Spring Batch Conflict – «Необходимо указать имя задания», несмотря на Spring.batch.job.enabled: falseJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Kafka Streams и Spring Batch Conflict – «Необходимо указать имя задания», несмотря на Spring.batch.job.enabled: false

Сообщение Anonymous »

У меня есть приложение Spring Boot, в котором я использую Kafka Streams (KStream и GlobalKTable) вместе с пакетным заданием Spring. Несмотря на настройку Spring.batch.job.enabled: false, я продолжаю сталкиваться со следующей ошибкой:

Вызвано: java.lang.IllegalArgumentException: Имя задания должно быть указывается в случае нескольких заданий

GlobalKTableConfig: создает GlobalKTable с использованием Kafka Streams:

Код: Выделить всё

@Slf4j
@Configuration
public class GlobalKTableConfig {

@Value("${spring.kafka.streams.properties.store-name}")
private String storeName;

@Value("${spring.kafka.streams.properties.schema.registry.url-config}")
private String schemaRegistryUrl;

@Value("${kafka.topic.name}")
private String topic;

@Value("${global-kafka-streams.application-id}")
private String globalTableAppId;

@Value("${spring.kafka.streams.bootstrap-servers}")
private String bootstrapServers;

@Bean(name = "globalKTableStreamsConfig")
public KafkaStreamsConfiguration globalKTableStreamsConfig() {
Map props = new HashMap();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, globalTableAppId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new KafkaStreamsConfiguration(props);
}

@Bean(name = "globalKTableStreamsBuilder")
public StreamsBuilderFactoryBean globalKTableStreamsBuilder(
@Qualifier("globalKTableStreamsConfig") KafkaStreamsConfiguration globalKTableStreamsConfig) {
return new StreamsBuilderFactoryBean(globalKTableStreamsConfig, new CleanupConfig(true, true));
}

@Bean
public GlobalKTable createGlobalKTable(
@Qualifier("globalKTableStreamsBuilder") StreamsBuilderFactoryBean globalKTableStreamsBuilderFactoryBean) throws Exception {

StreamsBuilder streamsBuilder = globalKTableStreamsBuilderFactoryBean.getObject();//5

Map serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);

return streamsBuilder.globalTable(ediLegTopic,
Materialized.as(storeName + "-global")
.withKeySerde(Serdes.String())
.withValueSerde(valueGenericAvroSerde));
}
}
KStreamsConfig: создает KStream и объединяет его с GlobalKTable:

Код: Выделить всё

@Configuration
@Slf4j
@DependsOn("createGlobalKTable")
public class KStreamsConfig {

@Value("${kafka.topic.name}")
private String topic;

@Value("${spring.kafka.streams.application-id}")
private String applicationId;

@Value("${spring.kafka.streams.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.streams.properties.schema.registry.url-config}")
private String schemaRegistryUrl;

@Value("${kstream-kafka-streams.application-id}")
private String kStreamAppId;

@Value("${spring.kafka.streams.properties.store-name}")
private String storeName;

@Bean(name = "kafkaStreamsConfig")
public KafkaStreamsConfiguration kafkaStreamsConfig() {
Map props = new HashMap();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kStreamAppId);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
return new KafkaStreamsConfiguration(props);
}

@Bean(name = "kStreamBuilder")
public StreamsBuilderFactoryBean kStreamBuilder(
@Qualifier("kafkaStreamsConfig") KafkaStreamsConfiguration config) {
return new StreamsBuilderFactoryBean(config, new CleanupConfig(true, true));
}

@Bean
public KStream createKStream(@Qualifier("kStreamBuilder") StreamsBuilder kStreamBuilder,
@Qualifier("createGlobalKTable") GlobalKTable globalKTable) {
Map  serdeConfig = Collections.singletonMap("schema.registry.url", schemaRegistryUrl);
final Serde valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false);

KStream kStream = kStreamBuilder.stream(ediLegTopic, Consumed.with(Serdes.String(), valueGenericAvroSerde));

KStream joinedStream = kStream.leftJoin(globalKTable,
(key, value) -> key,
(streamValue, tableValue) -> {
if (tableValue != null) {
return null;
}
return streamValue;
});

processJoinedStream(joinedStream);

return joinedStream;
}

private void processJoinedStream(KStream joinedStream) {
joinedStream.foreach((key, value) -> {
// processing logic here
});
}
}
syncBatchConfiguration: определяет пакетное задание Spring:

Код: Выделить всё

@Slf4j
@Configuration
public class syncBatchConfiguration {

public syncBatchConfiguration(
// configuration
}

@Bean
public ItemReader itemReader(
}

@Bean
public ItemProcessor itemProcessor() {
}

@Bean
public ItemWriter itemWriter() {
RepositoryItemWriter itemWriter = new RepositoryItemWriter();
}

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
log.info("Creating ThreadPoolTaskExecutor...");
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
return taskExecutor;
}

@Bean
protected Step process(JobRepository jobRepository,
//implement step
}

@Bean
public Job processJob(JobRepository jobRepository, Step processStep) {
//implement job
}

@Bean
public TaskExecutorJobLauncher asyncJobLauncher(JobRepository jobRepository) throws Exception {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
Я уже пробовал использовать @DependsOn, @Primary и исключения = {BatchAutoConfiguration.class}, но ни один из них не решил проблему. Когда я удаляю аннотацию @Configuration из syncBatchConfiguration, приложение работает нормально.
Я хочу, чтобы Kafka Streams (KStream и GlobalKTable) корректно работал вместе с заданием Spring Batch без конфликтов. Как я могу разрешить этот конфликт и предотвратить вмешательство Spring Batch в Kafka Stream.

Подробнее здесь: https://stackoverflow.com/questions/790 ... -despite-s
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Swiperefreshlayout Conflict Conflict с фрагментами
    Anonymous » » в форуме Android
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • Разница CoroutineScope(Dispatchers.IO + Job()) и CoroutineScope(Dispatcher.IO) + Job()
    Anonymous » » в форуме Android
    0 Ответы
    43 Просмотры
    Последнее сообщение Anonymous
  • Разница CoroutineScope(Dispatchers.IO + Job()) и CoroutineScope(Dispatcher.IO) + Job()
    Anonymous » » в форуме Android
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Разница между CoroutineScope(Dispatchers.IO + Job()) и CoroutineScope(Dispatcher.IO) + Job()
    Anonymous » » в форуме Android
    0 Ответы
    37 Просмотры
    Последнее сообщение Anonymous
  • Не удалось скопировать входной тензор из /job:localhost/replica:0/task:0/device:CPU:0 в /job:localhost/replica:0/task:0/
    Anonymous » » в форуме Python
    0 Ответы
    38 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»