Мне нужно создать пакетное задание Spring, которое считывает данные из базы данных A, преобразует данные и сохраняет их в базе данных B. Раньше мне удавалось создать неразделенное пакетное задание, но когда я пытаюсь реализовать секционированное задание, оно показывает некоторые исключения.
Вот мой код, и для примечания я оставил несколько комментариев, просто чтобы сообщить, что я что-то пробовал заранее, может быть, кто-то может дать мне немного понимание этого.
Итак, в .itemReader исключением является:
Метод throw
'org.springframework.beans.factory.support. ScopeNotActiveException'
исключение. Невозможно оценить
org.springframework.batch.item.data.RepositoryItemReader$$SpringCGLIB$$0.toString()
в .itemProcessor, исключением является:
Метод throw
'org.springframework.beans.factory.support.ScopeNotActiveException'
исключение. Невозможно вычислить jdk.proxy2.$Proxy108.toString()
в .itemWriter, исключение:
Метод выдал
'org.springframework.beans.factory.support.ScopeNotActiveException'
исключение. Невозможно оценить jdk.proxy2.$Proxy109.toString()
Почему-то процессор чтения и запись просто опускаются в коде, и если я использую отладчиком, я получаю эти исключения.
@Configuration
@EnableBatchProcessing
public class NewPartitionedBatchConfig {
private final CustomerRepository customerRepository;
private final SourceMigrationDataChanger sourceMigrationDataChanger;
public NewPartitionedBatchConfig(
CustomerRepository customerRepository,
SourceMigrationDataChanger sourceMigrationDataChanger
) {
this.customerRepository = customerRepository;
this.sourceMigrationDataChanger = sourceMigrationDataChanger;
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public Job sampleJob(
JobRepository jobRepository,
Step partitionedStep
) {
return new JobBuilder("sampleJob", jobRepository)
.start(partitionedStep)
.build();
}
// i've tried to declare it this way
// @Bean
// public Partitioner partitioner() {
// CustomPartitioner partitioner = new CustomPartitioner();
// partitioner.partition(GRID_SIZE);
// return partitioner;
// }
@Bean
public PartitionHandler partitionHandler(
JobExecutorConfig jobExecutorConfig,
Step workerStep
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
// handler.setTaskExecutor(jobExecutorConfig.threadPoolTaskExecutor()); //i've made a ThreadPoolTaskExecutor too
handler.setTaskExecutor(jobExecutorConfig.simpleAsyncTaskExecutor());
handler.setStep(workerStep);
handler.setGridSize(GRID_SIZE);
return handler;
}
@Bean
public Step partitionedStep(
JobRepository jobRepository,
CustomPartitioner customPartitioner,
// BasicPartitioner basicPartitioner, //i've tried Basic Partitioner too
PartitionHandler partitionHandler
) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", customPartitioner)
// .step(workerStep()) //i've tried to put a step directly without partitionedHandler already
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Step workerStep(
PlatformTransactionManager transactionManager,
JobRepository jobRepository,
RepositoryItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter
) {
return new StepBuilder("workerStep", jobRepository)
.chunk(BATCH_SIZE, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}
@Bean
@StepScope
public RepositoryItemReader itemReader(
@Value("#{stepExecutionContext['partitionIndex'] ?: 0}") Integer partitionIndex,
@Value("#{stepExecutionContext['gridSize']}") Integer gridSize
) {
if (partitionIndex == null) {
throw new IllegalStateException("partitionIndex is null. Ensure that the step execution context is properly set.");
}
RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(customerRepository);
// reader.setMethodName("findCustomersByPage"); using JPA Pageable
reader.setMethodName("findCustomersByPartition");
reader.setArguments(Arrays.asList(partitionIndex, gridSize));
reader.setPageSize(BATCH_SIZE);
reader.setSort(Collections.singletonMap("id", Sort.Direction.ASC));
return reader;
}
@Bean
@StepScope
public ItemProcessor itemProcessor() {
return customer -> {
sourceMigrationDataChanger.updateCustomerMigrated((Customer) customer);
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, customer);
return defaultKafkaHash;
};
}
@Bean
@StepScope
public ItemWriter itemWriter() {
return items -> {
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, items);
// it used to send some data through kafka, but omit this for now
};
}
}
Теперь мой пользовательский разделитель
@Getter
@Component
public class CustomPartitioner implements Partitioner {
// public Map partitions = new HashMap();
@Override
public Map partition(int gridSize) {
System.out.println("Partitioning with gridSize: " + gridSize);
gridSize=2;
Map partitions = new HashMap();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionIndex", i);
context.putInt("gridSize", gridSize);
context.putInt("startIndex", i * BATCH_SIZE);
context.putInt("pageSize", BATCH_SIZE);
partitions.put(PARTITION_KEY + i, context);
}
return partitions;
}
}
Подробнее здесь: https://stackoverflow.com/questions/793 ... ting-sprin
ScopeNotActiveException в считывателе, процессоре и писателе при реализации секционированного задания Spring Batch ⇐ JAVA
Программисты JAVA общаются здесь
1736852186
Anonymous
Мне нужно создать пакетное задание Spring, которое считывает данные из базы данных A, преобразует данные и сохраняет их в базе данных B. Раньше мне удавалось создать неразделенное пакетное задание, но когда я пытаюсь реализовать секционированное задание, оно показывает некоторые исключения.
Вот мой код, и для примечания я оставил несколько комментариев, просто чтобы сообщить, что я что-то пробовал заранее, может быть, кто-то может дать мне немного понимание этого.
Итак, в .itemReader исключением является:
Метод throw
'org.springframework.beans.factory.support. ScopeNotActiveException'
исключение. Невозможно оценить
org.springframework.batch.item.data.RepositoryItemReader$$SpringCGLIB$$0.toString()
в .itemProcessor, исключением является:
Метод throw
'org.springframework.beans.factory.support.ScopeNotActiveException'
исключение. Невозможно вычислить jdk.proxy2.$Proxy108.toString()
в .itemWriter, исключение:
Метод выдал
'org.springframework.beans.factory.support.ScopeNotActiveException'
исключение. Невозможно оценить jdk.proxy2.$Proxy109.toString()
Почему-то процессор чтения и запись просто опускаются в коде, и если я использую отладчиком, я получаю эти исключения.
@Configuration
@EnableBatchProcessing
public class NewPartitionedBatchConfig {
private final CustomerRepository customerRepository;
private final SourceMigrationDataChanger sourceMigrationDataChanger;
public NewPartitionedBatchConfig(
CustomerRepository customerRepository,
SourceMigrationDataChanger sourceMigrationDataChanger
) {
this.customerRepository = customerRepository;
this.sourceMigrationDataChanger = sourceMigrationDataChanger;
}
@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Bean
public Job sampleJob(
JobRepository jobRepository,
Step partitionedStep
) {
return new JobBuilder("sampleJob", jobRepository)
.start(partitionedStep)
.build();
}
// i've tried to declare it this way
// @Bean
// public Partitioner partitioner() {
// CustomPartitioner partitioner = new CustomPartitioner();
// partitioner.partition(GRID_SIZE);
// return partitioner;
// }
@Bean
public PartitionHandler partitionHandler(
JobExecutorConfig jobExecutorConfig,
Step workerStep
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
// handler.setTaskExecutor(jobExecutorConfig.threadPoolTaskExecutor()); //i've made a ThreadPoolTaskExecutor too
handler.setTaskExecutor(jobExecutorConfig.simpleAsyncTaskExecutor());
handler.setStep(workerStep);
handler.setGridSize(GRID_SIZE);
return handler;
}
@Bean
public Step partitionedStep(
JobRepository jobRepository,
CustomPartitioner customPartitioner,
// BasicPartitioner basicPartitioner, //i've tried Basic Partitioner too
PartitionHandler partitionHandler
) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", customPartitioner)
// .step(workerStep()) //i've tried to put a step directly without partitionedHandler already
.partitionHandler(partitionHandler)
.build();
}
@Bean
public Step workerStep(
PlatformTransactionManager transactionManager,
JobRepository jobRepository,
RepositoryItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter
) {
return new StepBuilder("workerStep", jobRepository)
.chunk(BATCH_SIZE, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}
@Bean
@StepScope
public RepositoryItemReader itemReader(
@Value("#{stepExecutionContext['partitionIndex'] ?: 0}") Integer partitionIndex,
@Value("#{stepExecutionContext['gridSize']}") Integer gridSize
) {
if (partitionIndex == null) {
throw new IllegalStateException("partitionIndex is null. Ensure that the step execution context is properly set.");
}
RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(customerRepository);
// reader.setMethodName("findCustomersByPage"); using JPA Pageable
reader.setMethodName("findCustomersByPartition");
reader.setArguments(Arrays.asList(partitionIndex, gridSize));
reader.setPageSize(BATCH_SIZE);
reader.setSort(Collections.singletonMap("id", Sort.Direction.ASC));
return reader;
}
@Bean
@StepScope
public ItemProcessor itemProcessor() {
return customer -> {
sourceMigrationDataChanger.updateCustomerMigrated((Customer) customer);
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, customer);
return defaultKafkaHash;
};
}
@Bean
@StepScope
public ItemWriter itemWriter() {
return items -> {
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, items);
// it used to send some data through kafka, but omit this for now
};
}
}
Теперь мой пользовательский разделитель
@Getter
@Component
public class CustomPartitioner implements Partitioner {
// public Map partitions = new HashMap();
@Override
public Map partition(int gridSize) {
System.out.println("Partitioning with gridSize: " + gridSize);
gridSize=2;
Map partitions = new HashMap();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionIndex", i);
context.putInt("gridSize", gridSize);
context.putInt("startIndex", i * BATCH_SIZE);
context.putInt("pageSize", BATCH_SIZE);
partitions.put(PARTITION_KEY + i, context);
}
return partitions;
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79354661/scopenotactiveexception-on-reader-processor-and-writer-while-implementing-sprin[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия