Блокирующий (синхронный) упрощенный код, написанный в псевдокоде, будет выглядеть так:
Код: Выделить всё
// List of ids of printers which are fetched from a web service
List printerIds = List.of(1L, 2L, 3L, 4L) ;
// retrieve the printers using a rest call
List
printers = printersforIds(printerIds)
// create additional mapping created by calling yet another web service
Map mappingPrinterCodes = mappingPrinterCodes(printers.getAllCodes());
// In order to create a single PrinterResponse object I need to use values from single Printer object and from the mappingPrinterCodes
List printerGrid = printers.map(p-> { createResponse(p,mappingPrinterCodes )}

Я пытался написать это с использованием реактивных библиотек:
Код: Выделить всё
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
public class MainReactive {
public static void main(String[] args) {
System.out.println("start");
MainReactive mainReactive = new MainReactive();
mainReactive.example();
}
public void example() {
Mono[*]> printerIds = Mono.just(List.of(1L, 2L, 3L, 4L));
Flux printers = printerIds.flatMapMany(this::fetchPrinters);
Mono mappingPrinterCodes = mappingPrinterCodes(printers);
Flux printerResponseFlux = printers.map(
printer -> Mono.zip(mappingPrinterCodes, mappingPrinterCodes).map(
tuple -> {
// here the important part is that I have to create new object based on both mappingPrinterCodes and the single printer itself
String code = printer.getCode();
return new PrinterResponse("result " + code + tuple.getT1().get(code));
}))
.flatMap(m -> m);
printerResponseFlux.subscribe(System.out::println);
}
public Flux fetchPrinters(List ids) {
return Flux.fromIterable(ids)
.map(id -> new Printer(id, "code " + id))
.doOnSubscribe(s -> log.info("P subscribed to fetchPrinters : {}", s.toString()))
.doOnTerminate(() -> log.info("P finished call to fetchPrinters "));
}
public Flux fetchPrinterCodes(List ids) {
return Flux.fromIterable(ids)
.map(id -> new PrinterCode(id, "code " + id))
.doOnSubscribe(s -> log.info("PC subscribed to fetchPrinterCodes : {}", s.toString()))
.doOnTerminate(() -> log.info("PC finished call to fetchPrinterCodes "));
}
private Mono mappingPrinterCodes(Flux printerFlux) {
return printerFlux.collectList().map(printers -> {
List printerCodes = printers.stream()
.map(Printer::getCode)
.toList();
Flux printerCodeFlux = fetchPrinterCodes(printerCodes);
return
printerCodeFlux.collectList()
.map(ppp -> ppp.
stream().
collect(
Collectors.toMap(
pcode -> pcode.getCode() + " from mapping",
Function.identity())));
})
.flatMap(mono -> mono);
}
}
Код: Выделить всё
start
16:02:16.751 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@22eeefeb
16:02:16.761 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@5579bb86
16:02:16.762 [main] INFO MainReactive -- P finished call to fetchPrinters
16:02:16.766 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@52525845
16:02:16.766 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
16:02:16.767 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@1f021e6c
16:02:16.768 [main] INFO MainReactive -- P finished call to fetchPrinters
16:02:16.768 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@103f852
16:02:16.768 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 1 | Printer code 1
16:02:16.773 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@61ca2dfa
- использовать разные подходы, вместо FlatMap использовать concatMap и т. д.
- Используйте Share() или кэш() на Flux
Какие части кода могут быть причина проблемы и почему это происходит?
Подробнее здесь: https://stackoverflow.com/questions/789 ... ux-flatmap