Почему возникает слишком много подписок при использовании Flux. FlatMapMany , Flux. FlatMap и Mono.zip вместеJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Почему возникает слишком много подписок при использовании Flux. FlatMapMany , Flux. FlatMap и Mono.zip вместе

Сообщение Anonymous »

Я пытаюсь написать реактивную версию кода, которая отлично работает при блокировке. Я использую Spring-boot-starter-webflux.
Блокирующий (синхронный) упрощенный код, написанный в псевдокоде, будет выглядеть так:

Код: Выделить всё

// List of ids of printers which are fetched from a web service
List  printerIds =  List.of(1L, 2L, 3L, 4L) ;

// retrieve the printers using a rest call
List
 printers = printersforIds(printerIds)

// create additional mapping created by calling yet another web service
Map mappingPrinterCodes = mappingPrinterCodes(printers.getAllCodes());

// In order to create a single PrinterResponse object I need to use values from single Printer object and from the mappingPrinterCodes
List printerGrid = printers.map(p-> { createResponse(p,mappingPrinterCodes )}

И небольшая визуализация:
Изображение
Я пытался написать это с использованием реактивных библиотек:

Код: Выделить всё

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class MainReactive {

public static void main(String[] args) {
System.out.println("start");
MainReactive mainReactive = new MainReactive();
mainReactive.example();

}

public void example() {
Mono[*]> printerIds = Mono.just(List.of(1L, 2L, 3L, 4L));

Flux printers = printerIds.flatMapMany(this::fetchPrinters);
Mono mappingPrinterCodes = mappingPrinterCodes(printers);

Flux printerResponseFlux = printers.map(
printer -> Mono.zip(mappingPrinterCodes, mappingPrinterCodes).map(
tuple -> {
// here the important part is that I have to create new object based on both mappingPrinterCodes and the single printer itself
String code = printer.getCode();
return new PrinterResponse("result " + code + tuple.getT1().get(code));
}))
.flatMap(m -> m);
printerResponseFlux.subscribe(System.out::println);
}

public Flux fetchPrinters(List ids) {
return Flux.fromIterable(ids)
.map(id -> new Printer(id, "code " + id))
.doOnSubscribe(s -> log.info("P subscribed to fetchPrinters  : {}", s.toString()))
.doOnTerminate(() -> log.info("P finished call to fetchPrinters "));
}

public Flux fetchPrinterCodes(List ids) {
return Flux.fromIterable(ids)
.map(id -> new PrinterCode(id, "code " + id))
.doOnSubscribe(s -> log.info("PC subscribed to fetchPrinterCodes  : {}", s.toString()))
.doOnTerminate(() -> log.info("PC finished call to fetchPrinterCodes "));
}

private Mono mappingPrinterCodes(Flux printerFlux) {
return printerFlux.collectList().map(printers -> {
List printerCodes = printers.stream()
.map(Printer::getCode)
.toList();

Flux printerCodeFlux = fetchPrinterCodes(printerCodes);

return
printerCodeFlux.collectList()
.map(ppp -> ppp.
stream().
collect(
Collectors.toMap(
pcode -> pcode.getCode() + " from mapping",
Function.identity())));
})
.flatMap(mono -> mono);
}

}

хотя код в конечном итоге распечатает нужные элементы. Я вижу, что активировано слишком много подписок. Журналы:

Код: Выделить всё

start
14:33:26.469 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@22eeefeb
14:33:26.480 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@5579bb86
14:33:26.480 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.484 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@52525845
14:33:26.484 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.485 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@1f021e6c
14:33:26.486 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.486 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@103f852
14:33:26.486 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 1 | Printer code 1
14:33:26.492 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@61ca2dfa
14:33:26.492 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.492 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@4b53f538
14:33:26.492 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.492 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@134593bf
14:33:26.492 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.492 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@4bb4de6a
14:33:26.492 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 2 | Printer code 2
14:33:26.493 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@7ba18f1b
14:33:26.493 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.493 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@2f8f5f62
14:33:26.493 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.493 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@1068e947
14:33:26.493 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.493 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@7dc222ae
14:33:26.493 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 3 | Printer code 3
14:33:26.493 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@aecb35a
14:33:26.494 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.494 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@5fcd892a
14:33:26.494 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
14:33:26.494 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@8b87145
14:33:26.494 [main] INFO MainReactive -- P finished call to fetchPrinters
14:33:26.494 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@6483f5ae
14:33:26.494 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 4 | Printer code 4
14:33:26.494 [main] INFO MainReactive -- P finished call to fetchPrinters
Итак, как вы видите, «P подписан на fetchPrinters» печатается 9 раз. Я ожидаю, что он будет напечатан только один раз.
Я пробовал
  • использовать разные подходы вместо FlatMap. использовать concatMap и т. д.
  • Используйте Share() или кэш() на Flux
Ничего из этого не принесло желаемый эффект. Я думаю, что делаю фундаментальную ошибку со своей реактивной (неблокирующей) логикой, просто я не могу определить, где она. Приведенный мною пример уже упрощен по сравнению с реальным, где создаются еще 2 сопоставления и выполняются еще 2 вызова веб-служб.
Какие части кода могут быть причина проблемы и почему это происходит?

Подробнее здесь: https://stackoverflow.com/questions/789 ... ux-flatmap
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»