У меня есть вариант использования, когда мой сервер выдает определенные исключения при обработке запроса клиента, Мне нужно поддерживать соединение открытым, но не отправлять данные обратно клиенту. В настоящее время, когда возникают эти исключения, Netty автоматически закрывает соединение. Я попытался добавить исключение Caught(...) в свой собственный обработчик, но, похоже, оно так и не достигло его. Установка флага ChannelOption.AUTO_CLOSE в значение false также не работает, поскольку, похоже, это применимо только к исключениям, возникающим во время записи(). В моем случае мы никогда не записываем какие-либо данные обратно клиенту.
Ниже я применил обходной путь, чтобы гарантировать запуск методаExceptionCaught(), чтобы я мог соответствующим образом обрабатывать исключения и оставьте соединение открытым:
Код: Выделить всё
DisposableServer someTcpServer = tcpServer
.host("12.123.456.789")
.port(12345)
.wiretap(true)
.doOnBind(server -> log.info("Starting listener..."))
.doOnBound(server -> log.info("Listener started on host: {}, port: {}", server.host(), server.port()))
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.AUTO_CLOSE,false)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.doOnConnection(connection -> {
InetSocketAddress socketAddress = (InetSocketAddress) connection.channel().remoteAddress();
log.info("Client has connected. Host: {}, Port: {}",
socketAddress.getAddress().getHostAddress(), socketAddress.getPort());
})
.doOnChannelInit((observer, channel, remoteAddress) ->
channel.pipeline()
.addFirst(new TcpServerHandler())
)
.handle((inbound, outbound) ->
inbound
.receive()
.asByteArray()
.flatMap(req -> processRequest(req))
.flatMap(rsp -> outbound.sendByteArray(Flux.just(rsp))
//any exceptions thrown during the above processRequest()...
.onErrorResume(throwable -> {
//...will get handled here
inbound.withConnection(connection -> connection.channel().pipeline().fireExceptionCaught(throwable));
return Mono.empty();
})
).bindNow();
someTcpServer.onDispose().block();
}
Код: Выделить всё
@Slf4j
public class TcpServerHandler extends ChannelDuplexHandler {
private final AtomicLong startTime = new AtomicLong(0L);
private final AtomicLong endTime = new AtomicLong(0L);
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof BusinessLogicException1 ||
cause instanceof BusinessLogicException2) {
endTime.set(System.nanoTime());
log.info("Took {} ms to process", Duration.ofNanos(endTime.get() - startTime.get()).toMillis()))
//for these specific exceptions, keep the connection open
ctx.fireChannelActive();
} else {
//When an exception that wasn't one of the above^ was thrown
//I had super.exceptionCaught(...) here and this was causing my
//exceptionCaught(...) method to be called twice, so I removed the
//call to super.exceptionCaught(...) and just don't do anything.
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
startTime.set(System.nanoTime());
ctx.fireChannelRead(msg);
}
}
Подробнее здесь: https://stackoverflow.com/questions/788 ... exceptions