Java Stream forEachOrdered в параллельном неупорядоченном потоке потребляет в исходном порядкеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Java Stream forEachOrdered в параллельном неупорядоченном потоке потребляет в исходном порядке

Сообщение Anonymous »

Недавно я реализовал определенный алгоритм: один потребитель и несколько производителей, используя очередь блокировки для обмена состоянием. При реализации упрощенного варианта я думал, что, возможно, смогу заставить его работать, используя базовую реализацию потока Java.
Но это не сработало так, как ожидалось. Пример, воспроизводящий неожиданное поведение:

Код: Выделить всё

LongStream.range(0, 32)
.unordered()
.parallel()
.map(value -> {
// 'do some work', which can vary in duration
try {
Thread.sleep(new Random(value).nextLong(1000));
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
System.out.println("map " + value);
return value;
}).forEachOrdered(value -> {
// actual implementation writes to output stream
System.out.println(">>> " + value);
System.out.flush();
});
Моя первоначальная идея заключалась в том, что как только каждая карта будет завершена, она может быть использована терминальной операцией (примерно). А forEachOrdered будет обрабатывать один элемент раньше другого (поэтому нет необходимости в ручной синхронизации), по сравнению с использованием forEach. Но когда я запускаю пример кода, я получаю следующий результат:

Код: Выделить всё

map 7
map 29
map 8
map 11
map 28
map 21
map 20
map 13
map 6
map 10
map 1
map 15
map 5
map 24
map 26
map 14
map 9
map 2
map 31
map 17
map 22
map 16
map 3
map 23
map 12
map 27
map 0
>>> 0
>>> 1
>>> 2
>>> 3
map 4
>>> 4
>>> 5
>>> 6
>>> 7
>>> 8
>>> 9
>>> 10
>>> 11
>>> 12
>>> 13
>>> 14
>>> 15
>>> 16
>>> 17
map 25
map 30
map 18
>>> 18
map 19
>>> 19
>>> 20
>>> 21
>>> 22
>>> 23
>>> 24
>>> 25
>>> 26
>>> 27
>>> 28
>>> 29
>>> 30
>>> 31
Как видите, элементы отображаются не по порядку, но операция терминала вызывается в порядке источника потока. Вы можете усугубить проблему, если просто будете спать очень долго при обнаружении значения 0 и не спать при всех других значениях. Однако порядок не имеет значение.
Это меня смущает, поскольку я объявляю поток неупорядоченным. И, насколько я понимаю, forEachOrdered не обязательно обрабатывает по порядку при использовании с неупорядоченным потоком.
Конечно, это не значит, что это невозможно, поэтому текущее поведение, насколько я могу сказать, это совершенно нормально. В моем случае это просто невозможно.
Я что-то не понимаю? Стоит ли этого ожидать, и если да, то чем это вызвано и почему? Можно ли улучшить реализацию Streams?
Обновление:
В ответ на вопросы немного подробнее:
В моей исходной реализации у меня есть очередь задач, содержащая задачи, потоки-производители берут задачи из этой очереди и помещают результаты в одну потребительскую очередь. Каждый поток-производитель обрабатывает множество задач, поэтому также будет создавать несколько значений. Потребительский поток обрабатывает по одному результату за раз, что важно, поскольку он записывает в выходной поток, поэтому вызовы должны быть сериализованы/синхронизированы.
Для реализации потока моя идея заключалась в том, чтобы потоковые задачи, пусть производители будут задачами карты и записью на вывод при вызове терминала. Причина, по которой я использовал forEachOrdered, заключается в том, что он обещает
обрабатывать по одному элементу за раз. Но он не обещает порядка, если поток не имеет определенного порядка встреч, и это нормально. Этот последний бит, вместе с объявлением потока неупорядоченным и параллельным, заставил меня подумать, что он должен допускать неупорядоченную параллельную обработку. Но это не так.
Может быть, это просто деталь реализации, почему сейчас этого не происходит. Или, может быть, я неправильно понимаю. Обратите внимание: если я изменю forEachOrdered на forEach и синхронизирую внутри forEach, все будет работать так, как ожидалось.

Подробнее здесь: https://stackoverflow.com/questions/792 ... urce-order
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»