RejectedExecutionException при использовании ManagedBlocker во вложенных параллельных потокахJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 RejectedExecutionException при использовании ManagedBlocker во вложенных параллельных потоках

Сообщение Anonymous »

Это продолжение этой публикации: Почему внутренние параллельные потоки в этом сценарии выполняются быстрее с новыми пулами, чем с общим пулом?
Заранее извиняюсь за стену текста. Это для JDK 17.
В настоящее время я тестирую методы со следующими сценариями:
  • Внешний параллельный поток и внутренний вложенный параллельный поток, который использует ManagedBlocker для вызова Thread::sleep
  • Внешний параллельный поток и внутренний вложенный параллельный поток, который использует другой FJPool, но также использует ManagedBlocker для вызова Thread::sleep
Вот как выглядит код:

Код: Выделить всё

public class NestedPerf {
private static final ForkJoinPool sharedInnerPool = new ForkJoinPool();

public static void main(String[] args){
//    testInnerParallelLoopWithSharedPoolAndManagedBlock(10000);
//    testInnerParallelLoopWithManagedBlock(2);
}

public static void testInnerParallelLoopWithSharedPoolAndManagedBlock(int limit){
Map threads = new ConcurrentHashMap();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}

public static void testInnerParallelLoopWithManagedBlock(int limit){
Map threads = new ConcurrentHashMap();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
StateData.outerLoop.parallelStream().unordered().forEach(i -> {
innerParallelLoopWithManagedBlock();
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.put(count, count);
});
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
}
Как сказано в заголовке, метод testInnerParallelLoopWithManagedBlock прерывается со следующим исключением:

Код: Выделить всё

java.util.concurrent.RejectedExecutionException: java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
Но метод testInnerParallelLoopWithSharedPoolAndManagedBlock работает нормально до числа 10 000 (а возможно и больше, но я не тестировал больше 10 000).
Изначально я думал, что проблема может быть связана с взаимодействием между вложенными параллельными потоками и ManagedBlocker, действующим в одном и том же пуле. Поэтому я создал еще один метод тестирования, в котором я использую один и тот же общий пользовательский FJPool как для внутренних, так и для внешних циклов:

Код: Выделить всё

public static void testBothLoopsWithSharedPoolAndManagedBlock(int limit){
Map threads = new ConcurrentHashMap();
boolean threwException = false;
for(int c = 0; c < limit; c++) {
try {
sharedInnerPool.submit(()->StateData.outerLoop.parallelStream().unordered().forEach(i -> {
sharedInnerPool.submit(() -> StateData.innerLoop.parallelStream().unordered().forEach(j -> {
try {
ForkJoinPool.managedBlock(new SleepManagedBlocker(Duration.ofMillis(5)));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
})).join();
int count = sharedInnerPool.getActiveThreadCount();
threads.put(count, count);
})).join();
} catch (Exception e) {
System.out.println(e.getMessage());
threwException = true;
int count = ForkJoinPool.commonPool().getActiveThreadCount();
threads.clear();
threads.put(count, count);
break;
}
}
if(threwException){
System.out.println("Exception thrown. Max Thread Count = "+  threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
} else {
System.out.println( "Max Thread Count = " +threads.keySet().stream().max(Comparator.comparingInt(x->x)).orElse(-1));
}
}
Но этот работает отлично. Таким образом, проблема, похоже, связана именно с ForkJoinPool.commonPool().
Есть ли у кого-нибудь представление о том, что происходит за кулисами?

Подробнее здесь: https://stackoverflow.com/questions/792 ... lelstreams
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • RejectedExecutionException при использовании ManagedBlocker во вложенных параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous
  • RejectedElementException при использовании ManagedBlocker во вложенных параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Почему libarchive замедляется при одновременном использовании в параллельных потоках? [закрыто]
    Anonymous » » в форуме C++
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Kafka Consumer.poll занимает разное время в параллельных потоках
    Anonymous » » в форуме JAVA
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Как обеспечить порядок обработки в параллельных потоках?
    Anonymous » » в форуме JAVA
    0 Ответы
    9 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»