ScopeNotActiveException в считывателе, процессоре и писателе при реализации секционированного задания Spring BatchJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 ScopeNotActiveException в считывателе, процессоре и писателе при реализации секционированного задания Spring Batch

Сообщение Anonymous »

Мне нужно создать пакетное задание Spring, которое считывает данные из базы данных A, преобразует данные и сохраняет их в базе данных B. Раньше мне удавалось создать неразделенное пакетное задание, но когда я пытаюсь реализовать секционированное задание, оно показывает некоторые исключения.
Вот мой код, и для примечания я оставил несколько комментариев, просто чтобы сообщить, что я что-то пробовал заранее, может быть, кто-то может дать мне немного понимание этого.
Итак, в .itemReader исключением является:

Метод throw
'org.springframework.beans.factory.support. ScopeNotActiveException'
исключение. Невозможно оценить
org.springframework.batch.item.data.RepositoryItemReader$$SpringCGLIB$$0.toString()

в .itemProcessor, исключением является:

Метод throw
'org.springframework.beans.factory.support.ScopeNotActiveException'
исключение. Невозможно вычислить jdk.proxy2.$Proxy108.toString()

в .itemWriter, исключение:

Метод выдал
'org.springframework.beans.factory.support.ScopeNotActiveException'
исключение. Невозможно оценить jdk.proxy2.$Proxy109.toString()

Почему-то процессор чтения и запись просто опускаются в коде, и если я использую отладчиком, я получаю эти исключения.
@Configuration
@EnableBatchProcessing
public class NewPartitionedBatchConfig {

private final CustomerRepository customerRepository;
private final SourceMigrationDataChanger sourceMigrationDataChanger;

public NewPartitionedBatchConfig(
CustomerRepository customerRepository,
SourceMigrationDataChanger sourceMigrationDataChanger
) {
this.customerRepository = customerRepository;
this.sourceMigrationDataChanger = sourceMigrationDataChanger;
}

@Bean
public PlatformTransactionManager transactionManager(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean
public Job sampleJob(
JobRepository jobRepository,
Step partitionedStep
) {
return new JobBuilder("sampleJob", jobRepository)
.start(partitionedStep)
.build();
}

// i've tried to declare it this way
// @Bean
// public Partitioner partitioner() {
// CustomPartitioner partitioner = new CustomPartitioner();
// partitioner.partition(GRID_SIZE);
// return partitioner;
// }

@Bean
public PartitionHandler partitionHandler(
JobExecutorConfig jobExecutorConfig,
Step workerStep
) {
TaskExecutorPartitionHandler handler = new TaskExecutorPartitionHandler();
// handler.setTaskExecutor(jobExecutorConfig.threadPoolTaskExecutor()); //i've made a ThreadPoolTaskExecutor too
handler.setTaskExecutor(jobExecutorConfig.simpleAsyncTaskExecutor());
handler.setStep(workerStep);
handler.setGridSize(GRID_SIZE);
return handler;
}

@Bean
public Step partitionedStep(
JobRepository jobRepository,
CustomPartitioner customPartitioner,
// BasicPartitioner basicPartitioner, //i've tried Basic Partitioner too
PartitionHandler partitionHandler
) {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", customPartitioner)
// .step(workerStep()) //i've tried to put a step directly without partitionedHandler already
.partitionHandler(partitionHandler)
.build();
}

@Bean
public Step workerStep(
PlatformTransactionManager transactionManager,
JobRepository jobRepository,
RepositoryItemReader itemReader,
ItemProcessor itemProcessor,
ItemWriter itemWriter
) {
return new StepBuilder("workerStep", jobRepository)
.chunk(BATCH_SIZE, transactionManager)
.reader(itemReader)
.processor(itemProcessor)
.writer(itemWriter)
.build();
}

@Bean
@StepScope
public RepositoryItemReader itemReader(
@Value("#{stepExecutionContext['partitionIndex'] ?: 0}") Integer partitionIndex,
@Value("#{stepExecutionContext['gridSize']}") Integer gridSize
) {
if (partitionIndex == null) {
throw new IllegalStateException("partitionIndex is null. Ensure that the step execution context is properly set.");
}

RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(customerRepository);
// reader.setMethodName("findCustomersByPage"); using JPA Pageable
reader.setMethodName("findCustomersByPartition");
reader.setArguments(Arrays.asList(partitionIndex, gridSize));
reader.setPageSize(BATCH_SIZE);
reader.setSort(Collections.singletonMap("id", Sort.Direction.ASC));
return reader;
}

@Bean
@StepScope
public ItemProcessor itemProcessor() {
return customer -> {
sourceMigrationDataChanger.updateCustomerMigrated((Customer) customer);

Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, customer);
return defaultKafkaHash;
};
}

@Bean
@StepScope
public ItemWriter itemWriter() {
return items -> {
Map defaultKafkaHash = new HashMap();
defaultKafkaHash.put(KEY_DATA, items);
// it used to send some data through kafka, but omit this for now
};
}
}

Теперь мой пользовательский разделитель
@Getter
@Component
public class CustomPartitioner implements Partitioner {

// public Map partitions = new HashMap();

@Override
public Map partition(int gridSize) {
System.out.println("Partitioning with gridSize: " + gridSize);

gridSize=2;

Map partitions = new HashMap();
for (int i = 0; i < gridSize; i++) {
ExecutionContext context = new ExecutionContext();
context.putInt("partitionIndex", i);
context.putInt("gridSize", gridSize);
context.putInt("startIndex", i * BATCH_SIZE);
context.putInt("pageSize", BATCH_SIZE);

partitions.put(PARTITION_KEY + i, context);
}
return partitions;
}
}


Подробнее здесь: https://stackoverflow.com/questions/793 ... ting-sprin
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»