Сначала я покажу свой предыдущий рабочий код. Это не гарантирует одновременное выполнение не более двух потоков, даже если в пуле потоков всего два потока.
CompletableFuture taskFuture = idList.stream()
.map(id -> (Runnable) () -> processEntity(startTime, id))
.map(task -> CompletableFuture.runAsync(task, executorService))
.collect(Collectors.collectingAndThen(Collectors.toList(),
tasks -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))));
taskFuture.get();
Пересмотренная логика работы с семафором выглядит следующим образом:
var semaphore = new StreamSemaphore(MAX_CONCURRENT_EXECUTIONS);
var futureTasks = itemList.stream()
.map(semaphore::acquire)
.map(itemId -> (Supplier) () -> {
var result = processItem(startTime, itemId);
semaphore.release(null);
return result;
})
.map(task -> CompletableFuture.supplyAsync(task, taskExecutor)
.exceptionally(e -> {
log.error("Exception occurred while processing item", e);
System.exit(1);
return null;
}))
.toList();
futureTasks.forEach(TaskProcessor::safeGet);
где находится StreamSemaphore
public class StreamSemaphore {
private final Semaphore sema;
public StreamSemaphore(int slots) {
sema = new Semaphore(slots);
}
@SneakyThrows
public T acquire(T obj) {
sema.acquire();
return obj;
}
@SneakyThrows
public T release(T obj) {
sema.release();
return obj;
}
}
После обработки первых двух объектов выдается ошибка

org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is java.lang.IllegalStateException: EntityManagerFactory is closed
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:467)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)
at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)
at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:595)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:382)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708)
at com.xegara.process.PlanAccountRelationshipMaintenanceService$$EnhancerBySpringCGLIB$$dea4561a.handleInScopeCustomer()
at com.xegara.process.PlanAccountMaintenanceProcess.handleCustomerList(PlanAccountMaintenanceProcess.java:271)
at com.xegara.process.PlanAccountMaintenanceProcess.handleCustomerListTransaction(PlanAccountMaintenanceProcess.java:247)
at com.xegara.process.PlanAccountMaintenanceProcess.handlePlanAccountRelationshipForOwners(PlanAccountMaintenanceProcess.java:208)
at com.xegara.process.PlanAccountMaintenanceProcess.handlePlanAccountRelationshipForDepartment(PlanAccountMaintenanceProcess.java:156)
at com.xegara.process.PlanAccountMaintenanceProcess.handleDepartment(PlanAccountMaintenanceProcess.java:119)
at com.xegara.process.PlanAccountMaintenanceProcess.lambda$process$0(PlanAccountMaintenanceProcess.java:92)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run$$$capture(CompletableFuture.java:1768)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java)
at com.bytedance.cg.gcrm.overseas.common.util.executor.ContextPassThreadPoolTaskExecutor.lambda$null$1(ContextPassThreadPoolTaskExecutor.java:73)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: EntityManagerFactory is closed
at org.hibernate.internal.SessionFactoryImpl.validateNotClosed(SessionFactoryImpl.java:547)
at org.hibernate.internal.SessionFactoryImpl.createEntityManager(SessionFactoryImpl.java:636)
at org.hibernate.internal.SessionFactoryImpl.createEntityManager(SessionFactoryImpl.java:158)
at org.springframework.orm.jpa.AbstractEntityManagerFactoryBean.createNativeEntityManager(AbstractEntityManagerFactoryBean.java:585)
at jdk.internal.reflect.GeneratedMethodAccessor228.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.orm.jpa.AbstractEntityManagerFactoryBean.invokeProxyMethod(AbstractEntityManagerFactoryBean.java:487)
at org.springframework.orm.jpa.AbstractEntityManagerFactoryBean$ManagedEntityManagerFactoryInvocationHandler.invoke(AbstractEntityManagerFactoryBean.java:734)
at jdk.proxy2/jdk.proxy2.$Proxy191.createNativeEntityManager(Unknown Source)
at org.springframework.orm.jpa.JpaTransactionManager.createEntityManagerForTransaction(JpaTransactionManager.java:485)
at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:410)
... 21 common frames omitted
Подробнее здесь: https://stackoverflow.com/questions/793 ... -semaphore