Почему возникает слишком много подписок при использовании Flux. FlatMapMany , Flux. FlatMap и Mono.zip вместеJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Почему возникает слишком много подписок при использовании Flux. FlatMapMany , Flux. FlatMap и Mono.zip вместе

Сообщение Anonymous »

Я пытаюсь написать реактивную версию кода, которая отлично работает при блокировке. Я использую Spring-boot-starter-webflux.
Блокирующий (синхронный) упрощенный код, написанный в псевдокоде, будет выглядеть так:

Код: Выделить всё

// List of ids of printers which are fetched from a web service
List  printerIds =  List.of(1L, 2L, 3L, 4L) ;

// retrieve the printers using a rest call
List
 printers = printersforIds(printerIds)

// create additional mapping created by calling yet another web service
Map mappingPrinterCodes = mappingPrinterCodes(printers.getAllCodes());

// In order to create a single PrinterResponse object I need to use values from single Printer object and from the mappingPrinterCodes
List printerGrid = printers.map(p-> { createResponse(p,mappingPrinterCodes )}

И небольшая визуализация:
Изображение
Я пытался написать это с использованием реактивных библиотек:

Код: Выделить всё

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

@Slf4j
public class MainReactive {

public static void main(String[] args) {
System.out.println("start");
MainReactive mainReactive = new MainReactive();
mainReactive.example();

}

public void example() {
Mono[*]> printerIds = Mono.just(List.of(1L, 2L, 3L, 4L));

Flux printers = printerIds.flatMapMany(this::fetchPrinters);
Mono mappingPrinterCodes = mappingPrinterCodes(printers);

Flux printerResponseFlux = printers.map(
printer -> Mono.zip(mappingPrinterCodes, mappingPrinterCodes).map(
tuple -> {
// here the important part is that I have to create new object based on both mappingPrinterCodes and the single printer itself
String code = printer.getCode();
return new PrinterResponse("result " + code + tuple.getT1().get(code));
}))
.flatMap(m -> m);
printerResponseFlux.subscribe(System.out::println);
}

public Flux fetchPrinters(List ids) {
return Flux.fromIterable(ids)
.map(id -> new Printer(id, "code " + id))
.doOnSubscribe(s -> log.info("P subscribed to fetchPrinters  : {}", s.toString()))
.doOnTerminate(() -> log.info("P finished call to fetchPrinters "));
}

public Flux fetchPrinterCodes(List ids) {
return Flux.fromIterable(ids)
.map(id -> new PrinterCode(id, "code " + id))
.doOnSubscribe(s -> log.info("PC subscribed to fetchPrinterCodes  : {}", s.toString()))
.doOnTerminate(() -> log.info("PC finished call to fetchPrinterCodes "));
}

private Mono mappingPrinterCodes(Flux printerFlux) {
return printerFlux.collectList().map(printers -> {
List printerCodes = printers.stream()
.map(Printer::getCode)
.toList();

Flux printerCodeFlux = fetchPrinterCodes(printerCodes);

return
printerCodeFlux.collectList()
.map(ppp -> ppp.
stream().
collect(
Collectors.toMap(
pcode -> pcode.getCode() + " from mapping",
Function.identity())));
})
.flatMap(mono -> mono);
}

}

хотя код в конечном итоге распечатает нужные элементы. Я вижу, что активировано слишком много подписок. Голова из журнала:

Код: Выделить всё

start
16:02:16.751 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@22eeefeb
16:02:16.761 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@5579bb86
16:02:16.762 [main] INFO MainReactive -- P finished call to fetchPrinters
16:02:16.766 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@52525845
16:02:16.766 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
16:02:16.767 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@1f021e6c
16:02:16.768 [main] INFO MainReactive -- P finished call to fetchPrinters
16:02:16.768 [main] INFO MainReactive -- PC subscribed to fetchPrinterCodes  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@103f852
16:02:16.768 [main] INFO MainReactive -- PC finished call to fetchPrinterCodes
Printer result code 1 | Printer code 1
16:02:16.773 [main] INFO MainReactive -- P subscribed to fetchPrinters  : reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber@61ca2dfa
Я пробовал
  • использовать разные подходы, вместо FlatMap использовать concatMap и т. д.
  • Используйте Share() или кэш() на Flux
Ничто из этого не принесло желаемого эффекта. Я думаю, что делаю фундаментальную ошибку со своей реактивной (неблокирующей) логикой, просто я не могу определить, где она. Приведенный мною пример уже упрощен по сравнению с реальным, где создаются еще 2 сопоставления и выполняются еще 2 вызова веб-служб.
Какие части кода могут быть причина проблемы и почему это происходит?

Подробнее здесь: https://stackoverflow.com/questions/789 ... ux-flatmap
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»