Java Stream forEachOrdered в параллельном неупорядоченном потоке потребляет в исходном порядкеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Java Stream forEachOrdered в параллельном неупорядоченном потоке потребляет в исходном порядке

Сообщение Anonymous »

Недавно я реализовал определенный алгоритм: один потребитель и несколько производителей, используя очередь блокировки для обмена состоянием. При реализации упрощенного варианта я думал, что, возможно, смогу заставить его работать, используя базовую реализацию потока Java.
Но это не сработало так, как ожидалось. Пример, воспроизводящий неожиданное поведение:

Код: Выделить всё

LongStream.range(0, 32)
.unordered()
.parallel()
.map(value -> {
// 'do some work', which can vary in duration
try {
Thread.sleep(new Random(value).nextLong(1000));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("map " + value);
return value;
}).forEachOrdered(value -> {
// actual implementation writes to output stream
System.out.println(">>> " + value);
System.out.flush();
});
Моя первоначальная идея заключалась в том, что по завершении каждой карты ее можно будет обработать. А forEachOrdered будет обрабатывать один элемент раньше другого (поэтому нет необходимости в ручной синхронизации), по сравнению с использованием forEach. Но когда я запускаю пример кода, я получаю следующий результат:

Код: Выделить всё

map 7
map 29
map 8
map 11
map 28
map 21
map 20
map 13
map 6
map 10
map 1
map 15
map 5
map 24
map 26
map 14
map 9
map 2
map 31
map 17
map 22
map 16
map 3
map 23
map 12
map 27
map 0
>>> 0
>>> 1
>>> 2
>>> 3
map 4
>>> 4
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> 10
>>> 11
>>> 12
>>> 13
>>> 14
>>> 15
>>> 16
>>> 17
map 25
map 30
map 18
>>> 18
map 19
>>> 19
>>> 20
>>> 21
>>> 22
>>> 23
>>> 24
>>> 25
>>> 26
>>> 27
>>> 28
>>> 29
>>> 30
>>> 31
Как видите, элементы обрабатываются не по порядку, но операция терминала вызывается в порядке источника потока. Вы можете усугубить проблему, просто засыпая на очень долгое время при обнаружении значения 0 и не засыпая для всех остальных значений.
Это меня смущает, потому что я объявляю поток неупорядоченным. . И, насколько я понимаю, forEachOrdered не обязательно обрабатывает по порядку при использовании с неупорядоченным потоком.
Конечно, это не значит, что это невозможно, поэтому текущее поведение, насколько я могу сказать, это совершенно нормально. В моем случае это просто невозможно.
Я что-то не понимаю? Стоит ли этого ожидать, и если да, то чем это вызвано и почему? Можно ли улучшить реализацию Streams?

Подробнее здесь: https://stackoverflow.com/questions/792 ... urce-order
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»