Я реализовал два приложения: первое добавляет записи в поток, второе потребляет сообщения из потока.
Это работает, но через некоторое время (обычно после обработки более 80 000 сообщений) «пользовательское приложение» выдает исключение: «org.springframework.data.redis.RedisConnectionFailureException: невозможно подключиться к Redis; вложенным исключением является java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: невозможно подключиться к локальному хосту: 6379"
Я установил Redis 6.2.1 на Ubuntu (подсистема Linux Win10) и запустите ее с конфигурацией по умолчанию (порт 6379). То же самое происходит, когда я запускаю Redis в Docker.
Я проверил производительность Redis, поэтому сообщения добавлялись циклически без задержки.
Я добавляю сообщения в поток следующим образом:< /p>
Код: Выделить всё
ObjectRecord record = StreamRecords.newRecord()
.ofObject(message)
.withStreamKey("my-stream");
redisTemplate.opsForStream()
.add(record);
Код: Выделить всё
StreamReceiver.StreamReceiverOptions options =
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofSeconds(2))
.build();
StreamReceiver receiver = StreamReceiver.create(connectionFactory, options);
Flux messages = receiver.receive(StreamOffset.fromStart("my-tream"));
messages.subscribe(new StreamSubscriber());
Код: Выделить всё
org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1553)
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.lambda$getConnectionAsync$0(LettuceConnectionFactory.java:1491)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at io.lettuce.core.DefaultConnectionFuture.lambda$null$0(DefaultConnectionFuture.java:257)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:254)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:405)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
... 30 more
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
at io.lettuce.core.RedisClient.lambda$transformAsyncConnectionException$20(RedisClient.java:767)
at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:253)
... 22 more
Caused by: java.util.concurrent.CompletionException: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
... 20 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Код: Выделить всё
RedisTemplateЯ также заметил, что соединение Redis закрывается после отправки сообщения в поток и создается новое соединение, когда новое сообщение будет отправлено.
Код: Выделить всё
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] sk.kedros.learn.camel.redis.Publisher : sending message: {"obuSn":"eeee00a5-616e-46ad-80e4-50780fab1336","lon":0.0,"lat":0.0,"data":"99761 - provider1"}
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils : Fetching Redis Connection from RedisConnectionFactory
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] io.lettuce.core.RedisChannelHandler : dispatching command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() writeAndFlush command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() done
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandEncoder : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Received: 22 bytes, 1 commands in the stack
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] i.l.core.protocol.RedisStateMachine : Decode done, empty stack: true
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Completing command AsyncCommand [type=XADD, output=StatusOutput [output=1618125131455-2, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils : Closing Redis Connection.
Код: Выделить всё
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] onStreamMessage(MapBackedRecord{recordId=1618125112425-0, kvMap={payload={"obuSn":"d066bdd7-21b5-46f0-81ca-09afe4fd6596","lon":0.0,"lat":1.0,"data":"68541 - provider1"}}}): Emitting item, slow-path
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.RedisStateMachine : Decode done, empty stack: true
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] Completing command SubscriptionCommand [type=XREAD, output=StreamReadOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisChannelHandler : closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] onComplete()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] scheduleIfRequired()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] scheduleIfRequired(): Activating subscription
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver : [stream: loc] scheduleIfRequired(): Activating subscription, offset ReadOffset(offset=1618125112425-0)
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient : Trying to get a Redis connection for: redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient : Resolved SocketAddress localhost:6379 using redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.AbstractRedisClient : Connecting to Redis at localhost:6379
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.protocol.CommandHandler : [channel=0x87e269ec, [id: 0x6e14c57c] (inactive), chid=0x3be0] channelRegistered()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] deactivating endpoint handler
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive() done
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] Reconnect scheduling disabled
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelUnregistered()
2021-04-11 09:11:52.432 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.AbstractRedisClient : Connecting to Redis at localhost:6379: localhost:6379
io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]
Я что-то упустил? Это проблема с установкой/настройкой Redis? Или это проблема клиента салата? Или?
Подробнее здесь: https://stackoverflow.com/questions/670 ... ady-in-use
Мобильная версия