Передача потока через потоки JavaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Передача потока через потоки Java

Сообщение Anonymous »

У меня есть Akka Flow. Моя цель — передать ByteStrings во внешнюю библиотеку и отправить ее результат вниз по течению. Однако внешняя библиотека потребляет и выводит потоки Java. Я не хочу использовать Flow.fold(), потому что поток потенциально огромен. Я хочу передать поток Akka Flow в потоки Java, надеясь уменьшить объем памяти.
Ввод:

Код: Выделить всё

static Source inputData() {
final List list = IntStream.range(0, 1000).boxed().toList();
return Source.from(list).map(i -> ByteString.fromString(i.toString()));
}
Это моя первая попытка:

Код: Выделить всё

Flow transformFlow() {
// access to OutputStream and Source
final Pair pair = StreamConverters.asOutputStream().preMaterialize(actorSystem);
final OutputStream outStream = pair.first();
final Sink sink = StreamConverters.fromOutputStream(() -> outStream);

final int bufferSize = 1024;
final Pair sourcePair = Source
.queue(bufferSize, OverflowStrategy.backpressure())
.preMaterialize(actorSystem);
final SourceQueueWithComplete queue = sourcePair.first();
final Source inputSource = sourcePair.second();

// Copy inputStream to outStream
final ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
try (final InputStream inputStream = inputSource.runWith(StreamConverters.asInputStream(Duration.ofSeconds(10)), actorSystem)) {
// do some data manipulation
final byte[] buffer = new byte[1];
int len;
while ((len = inputStream.read(buffer)) != -1) {

outStream.write(buffer, 0, len);
outStream.write((byte) '\n');
}
outStream.flush();
} catch (final IOException e) {
throw new RuntimeException(e);
}
});

// Flow to push incoming data into the queue
final Flow storeIncoming = Flow.of(ByteString.class).map(bs -> {
queue.offer(bs);
return ByteString.emptyByteString();
});

return Flow.of(ByteString.class).via(storeIncoming).via(Flow.fromSinkAndSourceCoupled(sink, pair.second()));
}
К сожалению, это не работает. В тесте я получаю только часть входных данных. Я думаю, что outStream завершается, в то время как storeIncomming нужно записать еще больше данных.
Возможно, мне придется каким-то образом использовать watchTermination(). Моя вторая попытка выглядит так:

Код: Выделить всё

Flow transformFlow() {
final Pair pair = StreamConverters.asOutputStream().preMaterialize(actorSystem);
final OutputStream outStream = pair.first();
final Sink sink = StreamConverters.fromOutputStream(() -> outStream);

final int bufferSize = 1024;
final Pair sourcePair = Source
.queue(bufferSize, OverflowStrategy.backpressure())
.preMaterialize(actorSystem);
final SourceQueueWithComplete queue = sourcePair.first();
final Source inputSource = sourcePair.second();

final ExecutorService executorService = Executors.newSingleThreadExecutor();
final CompletableFuture copyDone = new CompletableFuture();
executorService.submit(() -> {
try (final InputStream inputStream = inputSource.runWith(StreamConverters.asInputStream(Duration.ofSeconds(10)), actorSystem)) {
final byte[] buffer = new byte[1];
int len;
while ((len = inputStream.read(buffer)) != -1) {
outStream.write(buffer, 0, len);
outStream.write((byte) '\n');
}
outStream.flush();
copyDone.complete(null);
} catch (final IOException e) {
copyDone.completeExceptionally(e);
}
});

final Flow storeIncoming = Flow
.of(ByteString.class)
.mapAsync(1, bs -> queue.offer(bs).thenApply(result -> ByteString.emptyByteString()))
.watchTermination((notUsed, done) -> {
done.whenComplete((ignore, ex) -> queue.complete());
return notUsed;
});

return Flow.of(ByteString.class).via(storeIncoming).via(Flow.fromSinkAndSourceCoupled(sink, pair.second())).watchTermination((mat, done) -> {
done.whenComplete((ignore, ex) -> {
try {
copyDone.get();
} catch (final Exception ignored) {
}
executorService.shutdown();
});
return mat;
});
}
Похоже, это работает лучше; Я получаю больше данных, чем при первой попытке. Но еще не все. Кроме того, не «перерастает» ли это в решение, в котором я сначала сохраняю весь поток в памяти, точно так же, как я бы вызывал Flow.fold(), чего мне очень хотелось бы избежать (проблемы с занимаемой площадью)?
Как я могу синхронизировать чтение и запись в потоки? Нет ли более элегантного способа добиться этого?

Подробнее здесь: https://stackoverflow.com/questions/797 ... va-streams
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»