Почему пулы потоков так значительно медленнее, чем ParallelStream для этого сценария?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Почему пулы потоков так значительно медленнее, чем ParallelStream для этого сценария?

Сообщение Anonymous »

Это для Java 23, но я также повторил это на Java 21 и 17.

Код: Выделить всё

Executors.newFixedThreadPool

[*]

Код: Выделить всё

Executors.newWorkStealingPool
[*]A manually generated ForkJoinPool
[*]

Код: Выделить всё

ForkJoinPool.commonPool
[*]A parallelStream() реализация, и
[*] Последовательный цикл, который будет действовать как базовый случай

И это расчет, который я испытываю в центре:

Код: Выделить всё

    @State(Scope.Benchmark)
private static class StateData{
public static final List models = IntStream.range(1,1000_001)
.mapToObj(x->{
double length = 10.0; // meters
// ContainerClass is a record with two double type args
return new ContainerClass(beamLength, x);
}).toList();
}

Callable getCallable(ContainerClass x){
return ()-> x.length()*x.load()/2.0;
}
< /code>
Код эталона ниже: < /p>
    @Benchmark
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public void testingParallelStream_toList(Blackhole bh){
var midMoments = StateData.models.parallelStream()
.unordered()
.map(x-> x.load()*x.length()/2.0).toList();

bh.consume(midMoments);
}

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public void testingSequential(Blackhole bh) {
List results = new ArrayList();
for(var x: StateData.models){
var result = x.load()*x.length()/2.0;
results.add(result);
}
bh.consume(results);
}

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public void testingExecutorService_FixedThreadPool(Blackhole bh) throws InterruptedException {
var pool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
runPool(bh, pool);
}

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public void testingExecutorService_WorkStealingPool(Blackhole bh) throws InterruptedException {
var pool = Executors.newWorkStealingPool(Runtime.getRuntime().availableProcessors());
runPool(bh, pool);
}

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public void testingManualFJPool(Blackhole bh) throws InterruptedException, ExecutionException {
var pool = new ForkJoinPool();
runPool(bh, pool);
}

@Benchmark
@Fork(value = 1)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public void testingCommonFJPool(Blackhole bh) throws InterruptedException, ExecutionException {
var pool = ForkJoinPool.commonPool();
List results = new ArrayList();

List callables = new ArrayList();

for(var m: StateData.models){
callables.add(getCallable(m));
}

var futures = pool.invokeAll(callables);
boolean isFinished = pool.awaitQuiescence(60L, TimeUnit.MINUTES);
if(!isFinished){
throw new IllegalArgumentException("Timeout");
} else {
for(var future: futures){
results.add(future.get());
}
}
bh.consume(results);
}
private void runPool(Blackhole bh, ExecutorService pool) throws InterruptedException {
List results = new ArrayList();

List  callables = new ArrayList();

for(var m: StateData.models){
callables.add(getCallable(m));
}

var futures = pool.invokeAll(callables);
// wait for thread runs
pool.shutdown();
try{
boolean isFinished = pool.awaitTermination(60L, TimeUnit.MINUTES);
if(isFinished){
for(var future: futures){
results.add(future.get());
}
bh.consume(results);
} else {
throw new IllegalArgumentException("Timeout occurred before all threads could finish");
}
} catch(Exception e){
throw new IllegalArgumentException(e);
}
}
< /code>
Я ожидал, что потоки будут медленнее, чем ParallelStream, из-за лучшего разделения задач в последнем, но результаты немного удивительны: < /p>
Benchmark                                                                  Mode  Cnt   Score   Error  Units
FunctionalVsImperative.PerfTests.testingCommonFJPool                      thrpt    5  10.859 ± 0.937  ops/s
FunctionalVsImperative.PerfTests.testingExecutorService_FixedThreadPool   thrpt    5   5.605 ± 0.907  ops/s
FunctionalVsImperative.PerfTests.testingExecutorService_WorkStealingPool  thrpt    5  10.278 ± 0.430  ops/s
FunctionalVsImperative.PerfTests.testingManualFJPool                      thrpt    5   9.875 ± 1.709  ops/s
FunctionalVsImperative.PerfTests.testingParallelStream_toList             thrpt    5  74.648 ± 4.755  ops/s
FunctionalVsImperative.PerfTests.testingSequential                        thrpt    5  46.895 ± 6.828  ops/s
не только платы за потоками гораздо медленнее, чем ParallelStream , они даже медленнее, чем базовый последовательный цикл! Я протестировал это в течение 1000, 10000 и 100000 задач/итераций, и эта тенденция остается прежней. Создание потоков меньше, чем польза от параллельных прогонов) < /p>
У кого -то есть представление о том, почему этот эталон ведет себя таким образом? Если это помогает, я запускаю это в системе Core i7 10850H с 12 доступными процессорами (Hexcore + HyperThreading).
Edit (23/04/25) : после некоторого обсуждения в комментариях (см. Ниже).

Код: Выделить всё

    @State(Scope.Benchmark)
private static class StateData{
public static final List models = IntStream.range(1,1000_001)
.mapToObj(x->{
double beamLength = 10.0; // meters
return new ContainerClass(beamLength, x);
}).toList();

public static final List getCallables(){
List callables = new ArrayList();
for(var m: StateData.models){
callables.add(StateData.getCallable(m));
}
return callables;
}

public static Callable getCallable(ContainerClass x){
return ()-> x.length()*x.load()/2.0;
}
}
Цель состояла в том, чтобы удалить любые накладные расходы, понесенные из создания одного миллиона Callible объектов. Метод Runpool () также был изменен:

Код: Выделить всё

private void runPool(Blackhole bh, ExecutorService pool) throws InterruptedException {
List results = new ArrayList();
var futures = pool.invokeAll(StateData.getCallables());
// ... rest of the code
}
К сожалению, это абсолютно ничего не сделало для улучшения эталона, результаты которого остаются очень похожими на то, что я видел раньше (поэтому я не добавил их здесь)

Подробнее здесь: https://stackoverflow.com/questions/795 ... his-scenar
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Python: запускайте многопроцессорные пулы и запускайте собственные многопроцессорные пулы.
    Anonymous » » в форуме Python
    0 Ответы
    46 Просмотры
    Последнее сообщение Anonymous
  • Создайте отдельные пулы потоков для пула потоков платформы в Java
    Anonymous » » в форуме JAVA
    0 Ответы
    62 Просмотры
    Последнее сообщение Anonymous
  • Метод ParallelStream().forEach() не обрабатывает все данные [закрыто]
    Гость » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Гость
  • Java ParallelStream с Spring Data JPA
    Anonymous » » в форуме JAVA
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Java HashMap и синхронизация с ParallelStream
    Anonymous » » в форуме JAVA
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»