Spring Flux Stream HeartBeat Асинхронное исключениеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Spring Flux Stream HeartBeat Асинхронное исключение

Сообщение Anonymous »

У меня есть контроллер, который выглядит так:
@RestController
public class MyController {

private final Flux flux1;
private final Flux flux2;

@GetMapping(produces = APPLICATION_NDJSON_VALUE)
public Flux subscribe() {

var mergedFlux = Flux.merge(this.flux1, this.flux2).doOnCancel(() -> {
log.debug("canceled");
}).doOnError(e -> {
log.error("error", e);
});

return keepAlive(Duration.of(10, ChronoUnit.SECONDS);, mergedFlux);
}

private Flux keepAlive(Duration interval, Flux flux) {
Flux heartBeat = Flux.interval(interval)
.map(t -> new HeartBeat()).doFinally(signalType -> log.debug("HearBeat
closed"));
return Flux.merge(flux, heartBeat);
}
}

Проблема в том, что всякий раз, когда я закрываю соединение клиента и отправляется следующий HeartBeat, я получаю следующее исключение:
Exception in thread "AsyncExecutor-1" java.lang.IllegalStateException: A non-container (application) thread attempted to use the AsyncContext after an error had occurred and the call to AsyncListener.onError() had returned. This is not allowed to avoid race conditions
Моя конфигурация выглядит так:
@Configuration
public class Config implements WebMvcConfigurer {

@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(poolTaskExecutor());
}

@Bean(name = "AsyncExecutor")
public AsyncTaskExecutor poolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(2);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return executor;
}

Это происходит только тогда, когда я использую метод KeepAlive для отправки HeartBeats между ними, чтобы соединение оставалось открытым и не истекало время ожидания.
Как я могу это предотвратить?
Я использую Spring Web MVC, а не Spring Webflux.
Это моя настройка приемников и потока :
@Configuration
public class SinkConfig {

@Bean
public Sinks.Many flux1() {
return Sinks.many().replay().latest();
}

@Bean
public Flux flux1(Sinks.Many sink) {
return sink.asFlux();
}

@Bean
public Sinks.Many flux2() {
return Sinks.many().replay().latest();
}

@Bean
public Flux flux2(Sinks.Many sink) {
return sink.asFlux();
}
}


Подробнее здесь: https://stackoverflow.com/questions/791 ... -exception
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»