Я хотел бы создать из него поток, который буферизует N событий, и в случае переполнения сразу закрывается с ошибкой, например если получатель медленно обрабатывает события.
Проблема в том, что onBackpressureBuffer(4, { println("Overflow on $it ”) }, BufferOverflowStrategy.ERROR) сначала обрабатывает буфер, а затем выдает ошибку. Я бы хотел, чтобы ошибка выдавалась в момент переполнения при печати $it.
Вот небольшая демонстрация с журналами
Код: Выделить всё
private val sink: Sinks.Many = Sinks
.many()
.multicast()
.directBestEffort()
fun createBufferFlux(): Flux {
return sink.asFlux()
.onBackpressureBuffer(4, { println("Overflow on $it") }, BufferOverflowStrategy.ERROR)
}
fun emitTestEvents() {
(1..20).forEach {
sink.tryEmitNext(it)
Thread.sleep(50)
}
}
fun main() {
val completable = CompletableFuture()
createBufferFlux()
.delayElements(Duration.ofMillis(100))
.subscribe(
{ println("Slow flux received: $it") },
{ println("Slow flux error: $it");completable.complete(Unit) },
{ println("Slow flux completed");completable.complete(Unit) }
)
ForkJoinPool.commonPool().execute {
emitTestEvents()
}
completable.join()
}
Код: Выделить всё
Slow flux received: 1
Slow flux received: 2
Slow flux received: 3
Slow flux received: 4
Slow flux received: 5
Overflow on 11 # im want close slow flux here
Slow flux received: 6
Slow flux received: 7
Slow flux received: 8
Slow flux received: 9
# but error only occured, when slow flux process buffer
Slow flux error: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Код: Выделить всё
Sinks.many()
.multicast()
.onBackpressureBuffer(4, false)
Подробнее здесь: https://stackoverflow.com/questions/790 ... s-overflow
Мобильная версия