Передача потока через потоки JavaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Передача потока через потоки Java

Сообщение Anonymous »

У меня есть Akka Flow. Моя цель — передать ByteStrings во внешнюю библиотеку и отправить ее результат вниз по течению. Однако внешняя библиотека потребляет и выводит потоки Java. Я не хочу использовать Flow.fold(), потому что поток потенциально огромен. Я хочу передать поток Akka Flow в потоки Java, надеясь уменьшить объем памяти.
Это моя текущая тестовая реализация.

Код: Выделить всё

Flow transformFlow() {
// access to OutputStream and Source
final Pair pair = StreamConverters.asOutputStream().preMaterialize(actorSystem);
final OutputStream outStream = pair.first();
final Sink sink = StreamConverters.fromOutputStream(() -> outStream);

final int bufferSize = 1024;
final Pair sourcePair = Source
.queue(bufferSize, OverflowStrategy.backpressure())
.preMaterialize(actorSystem);
final SourceQueueWithComplete queue = sourcePair.first();
final Source inputSource = sourcePair.second();

// Copy inputStream to outStream
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
try (final InputStream inputStream = inputSource.runWith(StreamConverters.asInputStream(Duration.ofSeconds(10)), actorSystem)) {
// do some data manipulation
final byte[] buffer = new byte[1];
int len;
while ((len = inputStream.read(buffer)) != -1) {

outStream.write(buffer, 0, len);
outStream.write((byte) '\n');
}
outStream.flush();
} catch (final IOException e) {
throw new RuntimeException(e);
}
});

// Flow to push incoming data into the queue
final Flow storeIncoming = Flow.of(ByteString.class).map(bs -> {
queue.offer(bs);
return ByteString.emptyByteString();
});

return Flow.of(ByteString.class).via(storeIncoming).via(Flow.fromSinkAndSourceCoupled(sink, pair.second()));
}
К сожалению, это не работает. В тесте я получаю только часть входных данных. Я думаю, что outStream завершается, а у storeIncomming остается еще больше данных для записи.
Как синхронизировать чтение и запись в потоки? Нет ли более элегантного способа добиться этого?

Подробнее здесь: https://stackoverflow.com/questions/797 ... va-streams
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»