Проблема с параллельным потоком JavaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Проблема с параллельным потоком Java

Сообщение Anonymous »

У меня есть фрагмент кода, который работает синхронно, как ожидалось, но не так, как ожидалось, параллельно.

У меня есть метод для получения некоторых элементов из конечной точки, этот метод получает список номера элементов.
ради производительности я хочу разделить входной список на части и вызвать серверную часть с помощью этих подсписков

Код: Выделить всё

public List callBackend(List itemNumbers) {
try {

int chunkSize = 10;
var endExclusive = (itemNumbers.size() + chunkSize - 1) / chunkSize;

return IntStream.range(0, endExclusive)
.mapToObj(i -> itemNumbers.subList(i * chunkSize, Math.min((i + 1) * chunkSize, itemNumbers.size())))
.parallel()
.map(chunk -> apiClient.getItemInfo(chunk))
.flatMap(List::stream)
.toList();

} catch (Exception e) {
log.error("Error", e);
return List.of();
}
}
Юнит-тест

Код: Выделить всё

List items = IntStream.rangeClosed(start, end).mapToObj(x -> "item-" + x).toList();

ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(List.class);

when(apiClient.getItemInfo(argumentCaptor.capture()))
.thenAnswer(invocationOnMock -> {
System.out.println(argumentCaptor.getValue().stream().collect(Collectors.joining(",")));
Thread.currentThread().sleep(100);
return argumentCaptor.getValue().stream().map(x -> ItemDto.builder().itemNumber(x).build()).toList();
});

long startTime = System.currentTimeMillis();
var itemDtos = unitUnderTest.callBackend(items);
long duration = System.currentTimeMillis() - startTime;

System.out.println("Duration: " + duration);

items.forEach(x -> {
var list = itemDtos.stream().filter(y -> y.getItemNumber().equals(x)).toList();

if (list.size() > 1) {
System.out.println("Item number: " + x + " is duplicated");
}
assertEquals(1, list.size());
});
assertEquals(items.size(), itemDtos.size());
verify(apiClient, times(items.size() / 10 + 1)).getItemInfo(anyList());
Проблема: если я удалю параллельную функцию(), тест пройдет успешно и результат будет таким, как ожидалось. но если я верну параллельный() на место, то API будет вызываться с повторяющимися фрагментами (подсписком) случайным образом, иногда первый фрагмент иногда нет, я пробовал тот же подход с ForkAndJoin, а некоторые также перебирали фрагменты с CompletableFuture, но результаты были одинаковый. Я не понимаю, почему происходит такое поведение.
есть идеи, как решить эту проблему. эта проблема всегда возникает, когда размер входного списка не превышает 10 (размер фрагмента).

Подробнее здесь: https://stackoverflow.com/questions/790 ... ream-issue
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»