Я нашел два интересных примера:
- Spring-GraphQL
- Netflix-DGS
Кажется, оба управляют обратным давлением, хотя и с немного более настраиваемыми параметрами в Sinks.Many. А ConnectableFlux, кажется, управляет подписчиками более элегантно с помощью refCount....возможно, Sinks.Many тоже делает то же самое.
Я могу легко соединить разрыв между обоими подходами примерно такой:
Код: Выделить всё
@Component
public class SinkConfig {
private final Sinks.Many sink;
private final Flux reviewFlux;
public SinkConfig() {
Sinks.Many sinks = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
this.reviewFlux = sinks.asFlux().publish().refCount(1);
this.sink = sinks;
}
//Only subscribe to reviews written by a particular user
Publisher publisher(String input) {
return reviewFlux.filterWhen(x -> Mono.fromCallable(() -> x.getUsername().equals(input)));
}
void emit(Review review) {
sink.emitNext(review, Sinks.EmitFailureHandler.FAIL_FAST);
}
}
Буду очень признателен за любую помощь!
Подробнее здесь: https://stackoverflow.com/questions/790 ... ctableflux
Мобильная версия