Сначала я покажу свой предыдущий рабочий код. Это не гарантирует одновременное выполнение двух потоков, даже если в пуле потоков всего два потока.
Код: Выделить всё
CompletableFuture taskFuture = idList.stream()
.map(id -> (Runnable) () -> processEntity(startTime, id))
.map(task -> CompletableFuture.runAsync(task, executorService))
.collect(Collectors.collectingAndThen(Collectors.toList(),
tasks -> CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]))));
taskFuture.get();
Код: Выделить всё
var semaphore = new StreamSemaphore(MAX_CONCURRENT_EXECUTIONS);
var futureTasks = itemList.stream()
.map(semaphore::acquire)
.map(itemId -> (Supplier) () -> processItem(startTime, itemId))
.map(task -> CompletableFuture.supplyAsync(task, taskExecutor)
.thenAccept(semaphore::release)
.exceptionally(e -> {
log.error("Exception occurred while processing item", e);
System.exit(1);
return null;
}))
.toList();
futureTasks.forEach(TaskProcessor::safeGet);
Код: Выделить всё
public class StreamSemaphore {
private final Semaphore sema;
public StreamSemaphore(int slots) {
sema = new Semaphore(slots);
}
@SneakyThrows
public T acquire(T obj) {
sema.acquire();
return obj;
}
@SneakyThrows
public T release(T obj) {
sema.release();
return obj;
}
}

Подробнее здесь: https://stackoverflow.com/questions/793 ... -semaphore