Код: Выделить всё
pipeline.apply("Read message",
KafkaIO.read()
.withTopic("MyTopic")
.withCluster("MyCluster")
.withoutMetadata())
.apply("Filter input", Filter.by(message -> Objects.nonNull(message.getValue()))
.apply("call api for each message", // What is the most efficient way to do this, is each api call blocking?)
, если вы называете API в каждом применении в луче, это асихроновое или блокирует и пытается исчерпать все доступные потоки?>
Подробнее здесь: https://stackoverflow.com/questions/760 ... am-in-java