Работа с медленным издателем в реакторе проекта concatMap ⇐ JAVA
Работа с медленным издателем в реакторе проекта concatMap
I've below code which programmatically generates Flux using generate. Since the result is Flux I'm using concatMap to map result to Flux. The producer of Mono works based on throttling. If too many requests are sent in short period of time it results in exception. To deal with this I've provided a subscriber that calls request(n) in hookOnSubscribe() and hookOnNext(). I want remote API to be called only request(n) times. This is like pull based request. But concatMap request upstream publisher even when there is no demand from downstream and it results in throttling exception. How can I make sure that remote API is called only request(n) times?
Flux generate = Flux.generate( () -> queryRequest, (state, sink) -> { if(!state.isPrepared() || !state.isDone()) { Mono nextPage = execute(state).doOnNext(result -> { QueryResult result1 = (QueryResult) result; state.setContKey(result1.getContinuationKey()); }); sink.next(nextPage); } else { sink.complete(); } return state; } ); generate.concatMap(resultMono -> resultMono);
Источник: https://stackoverflow.com/questions/781 ... -concatmap
I've below code which programmatically generates Flux using generate. Since the result is Flux I'm using concatMap to map result to Flux. The producer of Mono works based on throttling. If too many requests are sent in short period of time it results in exception. To deal with this I've provided a subscriber that calls request(n) in hookOnSubscribe() and hookOnNext(). I want remote API to be called only request(n) times. This is like pull based request. But concatMap request upstream publisher even when there is no demand from downstream and it results in throttling exception. How can I make sure that remote API is called only request(n) times?
Flux generate = Flux.generate( () -> queryRequest, (state, sink) -> { if(!state.isPrepared() || !state.isDone()) { Mono nextPage = execute(state).doOnNext(result -> { QueryResult result1 = (QueryResult) result; state.setContKey(result1.getContinuationKey()); }); sink.next(nextPage); } else { sink.complete(); } return state; } ); generate.concatMap(resultMono -> resultMono);
Источник: https://stackoverflow.com/questions/781 ... -concatmap
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Попытка найти код продукта MSI для Java 8 Update 25 в работе с издателем WSUS Package
Anonymous » » в форуме JAVA - 0 Ответы
- 5 Просмотры
-
Последнее сообщение Anonymous
-