Как мне реализовать конвейер Netty, контролируемый потоком, с помощью Flowcontrollondler и HttpcontentDecompressor?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как мне реализовать конвейер Netty, контролируемый потоком, с помощью Flowcontrollondler и HttpcontentDecompressor?

Сообщение Anonymous »

Дублируя https://github.com/netty/netty/issues/15053:
Я пытаюсь создать контролируемый потоком трубопровода Netty. Поток контролируется в том смысле, что он издает несколько сообщений за вызов для чтения () . Я выделил проблему до неожиданного взаимодействия между FlowControllolhandler и httpcontentdecompressor , воспроизведенным в следующем тесте в наборе тестов Netty:

Код: Выделить всё

index d9f5cd5b8d..f8edf63a29 100644
--- a/codec-http/src/test/java/io/netty/handler/codec/http/HttpContentDecompressorTest.java
+++ b/codec-http/src/test/java/io/netty/handler/codec/http/HttpContentDecompressorTest.java
@@ -20,6 +20,7 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.handler.flow.FlowControlHandler;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;
@@ -70,4 +71,33 @@ public class HttpContentDecompressorTest {
assertEquals(2, readCalled.get());
assertFalse(channel.finishAndReleaseAll());
}
+
+    @Test
+    public void testFlowController() {
+        final AtomicInteger messagesEmitted = new AtomicInteger();
+        final EmbeddedChannel channel = new EmbeddedChannel(
+                new FlowControlHandler(),
+                new HttpContentDecompressor(),
+                new ChannelInboundHandlerAdapter() {
+                    @Override
+                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+                        messagesEmitted.incrementAndGet();
+                    }
+                });
+
+        channel.config().setAutoRead(false);
+
+        final HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+        response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
+
+        assertFalse(channel.writeInbound(response));
+        assertFalse(channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(1))));
+        assertFalse(channel.writeInbound(new DefaultHttpContent(Unpooled.buffer(1))));
+
+        assertEquals(1, messagesEmitted.get());
+        channel.read();
+        assertEquals(2, messagesEmitted.get());
+        channel.read();
+        assertEquals(3, messagesEmitted.get());
+    }
}
Специально, с отключенной Autoread и FlowControlloldler Я бы ожидал, что трубопровод издает ровно одно сообщение на чтение () , но на самом деле сегодня он издает все три сообщения одновременно:

Код: Выделить всё

org.opentest4j.AssertionFailedError:
Expected :1
Actual   :3


at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:527)
at io.netty.handler.codec.http.HttpContentDecompressorTest.testFlowController(HttpContentDecompressorTest.java:97)
...
This is happening because the FlowControlHandler fabricates its own READ COMPLETE messages when it empties its queue, but also passes through the READ COMPLETE message from upstream when its internal queue is empty, so the HttpContentDecompressor sees two READ COMPLETE events after processing the first READ. Первое такое событие устанавливает httpcontentdecoder#edlead на true , а затем второе из них запускает другой ctx.read () .
К сожалению, я не уверен, что проблема заключается в том, что Flowcontrolhandler испускает два прочтения в ряду, но не прочтет, но это меж, но я не считывает, но это зачтенное, но это зачтенное чтение, но это не считывание. Httpcontentdecoder должен иметь возможность обрабатывать эти события без запуска другого ctx.read () , или, действительно, если это общее поведение ожидается, и я просто держу его неправильно. Любой из этих компонентов сам по себе делает, но в сочетании они нет.

Подробнее здесь: https://stackoverflow.com/questions/796 ... ntrolhandl
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»