Код: Выделить всё
@Test
void test() throws InterruptedException {
var sink = Sinks.many().unicast().onBackpressureBuffer();
sink.asFlux()
.groupBy(e -> e.charAt(0))
.flatMap(g -> g
.doOnNext(e -> System.out.println("got " + e))
.timeout(Duration.ofMillis(150), Mono.defer(() -> {
System.out.println("timeout");
return Mono.empty();
}))
.sample(Duration.ofMillis(100))
.doOnNext(e -> System.out.println("processed " + e)))
.subscribe();
sink.tryEmitNext("a1");
Thread.sleep(150);
sink.tryEmitNext("a2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
< /code>
А также мне нужно закрыть очередь для каждого вида сообщений после некоторого времени, чтобы он не потреблял никаких ресурсов. < /p>
он работает в целом. Если задержка между A1 Код: Выделить всё
got a1
got a2
processed a2
timeout
< /code>
a1Код: Выделить всё
got a1
processed a1
timeout
got a2
processed a2
timeout
< /code>
The message queue for a-events is cleared after timeout, created the new one and a2Проблема заключается в том, когда задержка составляет 150 мс. Иногда он получает A2 , но не обрабатывает его:
got a1
processed a1
got a2
timeout
< /code>
How to make this schema more stable. So it doesn't loose last event because of timeout?
Подробнее здесь: https://stackoverflow.com/questions/797 ... nd-timeout
Мобильная версия