Блокирующий (синхронный) упрощенный код, написанный в псевдокоде, будет выглядеть так:
Код: Выделить всё
// List of ids of printers which are fetched from a web service
List printerIds = List.of(1L, 2L, 3L, 4L) ;
// retrieve the printers using a rest call
List
printers = printersforIds(printerIds)
// create additional mapping created by calling yet another web service
Map mappingPrinterCodes = mappingPrinterCodes(printers.getAllCodes());
// In order to create a single PrinterResponse object I need to use values from single Printer object and from the mappingPrinterCodes
List printerGrid = printers.map(p-> { createResponse(p,mappingPrinterCodes )}

Я пытался написать это с использованием реактивных библиотек:
Код: Выделить всё
import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
@Slf4j
public class MainReactive {
public static void main(String[] args) {
System.out.println("start");
MainReactive mainReactive = new MainReactive();
mainReactive.example();
}
public void example() {
Mono[*]> printerIds = Mono.just(List.of(1L, 2L, 3L, 4L));
Flux printers = printerIds.flatMapMany(this::fetchPrinters);
Mono mappingPrinterCodes = mappingPrinterCodes(printers);
Flux printerResponseFlux = printers.map(
printer -> Mono.zip(mappingPrinterCodes, mappingPrinterCodes).map(
tuple -> {
// here the important part is that I have to create new object based on both mappingPrinterCodes and the single printer itself
String code = printer.getCode();
return new PrinterResponse("result " + code + tuple.getT1().get(code));
}))
.flatMap(m -> m);
printerResponseFlux.subscribe(System.out::println);
}
public Flux fetchPrinters(List ids) {
return Flux.fromIterable(ids)
.map(id -> new Printer(id, "code " + id))
.doOnSubscribe(s -> log.info("P subscribed to fetchPrinters : {}", s.toString()))
.doOnTerminate(() -> log.info("P finished call to fetchPrinters "));
}
public Flux fetchPrinterCodes(List ids) {
return Flux.fromIterable(ids)
.map(id -> new PrinterCode(id, "code " + id))
.doOnSubscribe(s -> log.info("PC subscribed to fetchPrinterCodes : {}", s.toString()))
.doOnTerminate(() -> log.info("PC finished call to fetchPrinterCodes "));
}
private Mono mappingPrinterCodes(Flux printerFlux) {
return printerFlux.collectList().map(printers -> {
List printerCodes = printers.stream()
.map(Printer::getCode)
.toList();
Flux printerCodeFlux = fetchPrinterCodes(printerCodes);
return
printerCodeFlux.collectList()
.map(ppp -> ppp.
stream().
collect(
Collectors.toMap(
pcode -> pcode.getCode() + " from mapping",
Function.identity())));
})
.flatMap(mono -> mono);
}
}
Код: Выделить всё
start
14:33:26.469 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@22eeefeb
14:33:26.480 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@5579bb86
14:33:26.480 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.484 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@52525845
14:33:26.484 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.485 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@1f021e6c
14:33:26.486 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.486 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@103f852
14:33:26.486 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 1 | Printer code 1
14:33:26.492 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@61ca2dfa
14:33:26.492 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.492 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@4b53f538
14:33:26.492 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.492 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@134593bf
14:33:26.492 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.492 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@4bb4de6a
14:33:26.492 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 2 | Printer code 2
14:33:26.493 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@7ba18f1b
14:33:26.493 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.493 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@2f8f5f62
14:33:26.493 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.493 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@1068e947
14:33:26.493 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.493 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@7dc222ae
14:33:26.493 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 3 | Printer code 3
14:33:26.493 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@aecb35a
14:33:26.494 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.494 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@5fcd892a
14:33:26.494 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.494 [main] INFO MainReactive -- P subscribed to fetchPrinters : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@8b87145
14:33:26.494 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.494 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@6483f5ae
14:33:26.494 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 4 | Printer code 4
14:33:26.494 [main] INFO MainReactive -- P finished call to fetchPrinters
Я пробовал
- использовать разные подходы вместо FlatMap. использовать concatMap и т. д.
- Используйте Share() или кэш() на Flux
Какие части кода могут быть причина проблемы и почему это происходит?
Подробнее здесь: https://stackoverflow.com/questions/789 ... ux-flatmap