Как видно из исходного кода, я сначала получаю общее количество строк, присутствующих в базе данных, прежде чем выполнять внутреннюю разбивку по страницам и асинхронное получение данных через потоки, затем эти потоки реорганизуются, и их данные объединяются в выходной поток. У меня такое впечатление, что именно с этой частью я не справился, потому что данные в потоке перекрываются. Ваша помощь будет мне очень полезна.
Исходный код сервиса:
Код: Выделить всё
@Override
public Flux getAllFromDifferentiel() throws InterruptedException, ExecutionException {
logger.info("Appel de la methode getAllFromDifferentiel()");
//Initialisation du flux de sortie
returnedFlux = Flux.empty();
//Recuperation du total de ligne
Mono totalRecords = repository.count();
int total = totalRecords.toFuture().get();
//Debut des calculs
int relicat = total % limit;
totalPage = (total / limit) + (relicat > 0 ? 1 : 0);
logger.info("Total page possible: " + totalPage);
Thread[] tasks = new Thread[totalPage];
for (int indexPage = 0; indexPage < totalPage; indexPage++) {
//Lancement en parallèle des requetes de pagination
int finalIndexPage = indexPage;
tasks[indexPage] = new Thread(() -> {
offset = finalIndexPage == 0 ? 0 : limit * finalIndexPage + 1;
logger.info("Page = " + (finalIndexPage + 1) + " Offset =" + offset);
//Regroupement des flux générés apres execution de la requete
returnedFlux = Flux.concat(returnedFlux , repository.getAllByPage(limit, offset).map(ObjectReturn.INSTANCE::mapToDTO));
});
tasks[indexPage].start();
//Agencement ordonné des tâches
try {
tasks[indexPage].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return returnedFlux
.doOnError(error -> {
logger.error("--------- Stream error ------------" + error.getMessage(), error);
throw new IllegalStateException(error);
})
.doOnCancel(() -> {
logger.info("--------- Stream canceled -------------");
})
.doOnComplete(() -> {
logger.info("--------- Stream completed -------------");
});
}
Код: Выделить всё
@GetMapping(value = "getData", produces = MediaType.APPLICATION_NDJSON_VALUE)
public Flux getAllFromDifferentiel() throws InterruptedException, ExecutionException {
return service.getAllFromDifferentiel();
}
Полезная нагрузка ответа
Как вы можете видеть в json, открытом с помощью VsCode, некоторые данные искажены, что приводит к некорректности конечного файла json. И эта проблема возникает на нескольких уровнях результирующего файла.
Подробнее здесь: https://stackoverflow.com/questions/787 ... ng-webflux