Заранее извиняюсь за стену текста. Это для JDK 17.
В настоящее время я тестирую методы со следующими сценариями:
- Внешний параллельный поток и внутренний вложенный параллельный поток, который использует ManagedBlocker для вызова Thread::sleep
- Внешний параллельный поток и внутренний вложенный параллельный поток, который использует другой FJPool, но также использует ManagedBlocker для вызова Thread::sleep
Код: Выделить всё
public class NestedPerf {
private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();
public static void main(String[] args){
// testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
// testInnerParallelLoopWithManagedBlock(2);
}
public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
Map threads = new ConcurrentHashMap();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
public static void testInnerParallelLoopWithManagedBlock(int limit){
Map threads = new ConcurrentHashMap();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoopWithManagedBlock();
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
}
Код: Выделить всё
java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
Изначально я думал, что проблема может быть связана с взаимодействием между вложенными параллельными потоками и ManagedBlocker, действующим в одном и том же пуле. Поэтому я создал еще один метод тестирования, в котором я использую один и тот же общий пользовательский FJPool как для внутренних, так и для внешних циклов:
Код: Выделить всё
public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
Map threads = new ConcurrentHashMap();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
})).join();
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+ threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
Есть ли у кого-нибудь представление о том, что происходит за кулисами?
Подробнее здесь: https://stackoverflow.com/questions/792 ... lelstreams