Создание многопоточной многопоточной работы казалось легким и простым. Процессор является пользовательским реализацией Itemprocessor. < /P>
Я мог бы сделать этот шаг многопоточной, создав Threadpooltaskexecutor и добавив его на шаг. Хотя я получил несколько разных исключений, все связаны с ленивой коллекцией в моем доменном классе, которую читают и обрабатывают. < /P>
java.lang.IllegalStateException: Session/EntityManager is closed
java.lang.NullPointerException: Cannot invoke "org.hibernate.engine.spi.SharedSessionContractImplementor.getPersistenceContext()" because "this.session" is null
java.lang.ArrayIndexOutOfBoundsException: Index 331 out of bounds for length 331
org.hibernate.LazyInitializationException: failed to lazily initialize a collection of role: com.company.app.domain.CustomObject.aListOfItems: could not initialize proxy - no Session
< /code>
Я не уверен, есть ли способ исправить это, не делая коллекцию нетерпеливой загрузкой. < /p>
Кажется, что Itemreader закрывает сеанс Hibernate до того, как элемент -процессор завершит обработку элементов базы данных. < /p>
@Bean
public Job batchJob(
final Step batchStep,
final Step step2,
final Step step3,
final Step step4,
final Step step5
) {
return new JobBuilder("batchJob", jobRepository)
.incrementer(new RunIdIncrementer())
.start(batchStep)
.next(step1)
.next(step2)
.next(step3)
.next(step4)
.next(step5)
.build();
}
@Bean
public Step batchStep(
final ItemReader customObjectReader,
final CustomObjectProcessor customObjectProcessor,
final TaskExecutor batchTaskExecutor
) {
return new StepBuilder("batchStep", jobRepository)
.chunk(100, transactionManager)
.reader(customObjectReader)
.processor(customObjectProcessor)
.writer(chunk -> {})
.taskExecutor(batchTaskExecutor)
.throttleLimit(4)
.build();
}
@Bean
@StepScope
public ItemReader customObjectReader(
final CustomObjectRepository customObjectRepository,
@Value("#{jobParameters['year']}") final Integer year,
@Value("#{jobParameters['month']}") final Integer month
) {
final RepositoryItemReader reader = new RepositoryItemReader();
reader.setRepository(customObjectRepository);
reader.setMethodName("findAllByYearAndMonth");
reader.setArguments(List.of(year, month));
reader.setSort(Map.of("id", Sort.Direction.ASC));
reader.setPageSize(100);
reader.setSaveState(false);
return reader;
}
@Bean
public TaskExecutor batchTaskExecutor() {
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(20);
executor.setThreadNamePrefix("Batch-");
executor.initialize();
return executor;
}
@Component
@StepScope
public class CustomObjectProcessor implements ItemProcessor, ItemStream {
@Override
public CustomObject process(final CustomObject customObject) {
log.info("Processing customObject Id={}", customObject.getId());
customObject.updateData();
return customObject;
}
}
@Repository
@Transactional(readOnly = true)
public interface CustomObjectRepository extends JpaRepository, QuerydslPredicateExecutor {
Page findAllByYearAndMonth(Integer year, Integer month, Pageable pageable);
}
< /code>
Я попытался сделать запрос репозитория, чтобы присоединиться к Fetch, но это, похоже, не работает. Он по -прежнему выполняет SQL Select при чтении коллекции. < /P>
@Query("""
SELECT co FROM CustomObject co
JOIN FETCH co.aListOfItems loi
WHERE co.year = :year
AND co.month = :month
""")
Page findAllByYearAndMonth(Integer year, Integer month, Pageable pageable);
Подробнее здесь: https://stackoverflow.com/questions/795 ... processing