Это моя текущая тестовая реализация.
Код: Выделить всё
Flow transformFlow() {
// access to OutputStream and Source
final Pair pair = StreamConverters.asOutputStream().preMaterialize(actorSystem);
final OutputStream outStream = pair.first();
final Sink sink = StreamConverters.fromOutputStream(() -> outStream);
final int bufferSize = 1024;
final Pair sourcePair = Source
.queue(bufferSize, OverflowStrategy.backpressure())
.preMaterialize(actorSystem);
final SourceQueueWithComplete queue = sourcePair.first();
final Source inputSource = sourcePair.second();
// Copy inputStream to outStream
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
try (final InputStream inputStream = inputSource.runWith(StreamConverters.asInputStream(Duration.ofSeconds(10)), actorSystem)) {
// do some data manipulation
final byte[] buffer = new byte[1];
int len;
while ((len = inputStream.read(buffer)) != -1) {
outStream.write(buffer, 0, len);
outStream.write((byte) '\n');
}
outStream.flush();
} catch (final IOException e) {
throw new RuntimeException(e);
}
});
// Flow to push incoming data into the queue
final Flow storeIncoming = Flow.of(ByteString.class).map(bs -> {
queue.offer(bs);
return ByteString.emptyByteString();
});
return Flow.of(ByteString.class).via(storeIncoming).via(Flow.fromSinkAndSourceCoupled(sink, pair.second()));
}
Как синхронизировать чтение и запись в потоки? Нет ли более элегантного способа добиться этого?
Подробнее здесь: https://stackoverflow.com/questions/797 ... va-streams
Мобильная версия