Тупик при вызове Spring Webflux или WebClientJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Тупик при вызове Spring Webflux или WebClient

Сообщение Anonymous »

Мне нужно использовать Spring WebClient, но мне нужно его заблокировать, однако я знаю, что это противоречит шаблону и плохо. Но мне это нужно, поскольку определенный интерфейс, который я собираюсь реализовать, не является реактивным. С другой стороны, мне нужно дождаться всех ответов, прежде чем я смогу продолжить, поскольку на основе ответов существуют какие-то действия. Я пытаюсь упростить проблему с помощью воображаемого примера, который просто воспроизводит проблему. Реальную проблему я не могу описать, но отпущу ее с помощью этого воображаемого примера.
У нас есть определенный интерфейс SentenceMaker как:

Код: Выделить всё

public interface SentenceMaker {
String makeSentence(String word);
String getName();
}
Поставщики услуг теперь предоставляют услуги на конечных точках отдыха. (реальная проблема заключается в том, что они могли предоставлять услугу с помощью различных средств связи, скажем, файловой базы, очереди сообщений и т. д. Теперь некоторые из них работают над службами отдыха. Итак, этот интерфейс SentenceMaker скрывает эту реализацию и оно не определено нами, поэтому мы можем его изменить.
Я реализовал BroadcastService и обработку после получения всех ответов, скажем, сортировку предложений по длине предложений следующим образом:

Код: Выделить всё

@Service
@Slf4j
public class BroadcastService {

@Autowired
private SentenceMakerProvider sentenceMakerProvider;

public List broadcast(String word) {

List allResponses = sentenceMakerProvider.get()
.stream()
.map(sentenceMaker ->
CompletableFuture.supplyAsync(() -> {
log.info("Sending to == {} ==> to make a sentence.", sentenceMaker.getName());
return sentenceMaker.makeSentence(word);
})
.thenApply(sentence -> {
log.info("Got sentence: {}", sentence);
return sentence;
})
)
.toList();

CompletableFuture
.allOf(allResponses.stream().toArray(CompletableFuture[]::new))
.join();

return allResponses
.stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(String::length))
.toList();

}
Я использовал CompletableFuture, чтобы одновременно выполнять все вызовы к разным поставщикам услуг.
И есть SentenceMakerProvider, который сохраняет ссылку на реализации всех поставщиков услуг:

Код: Выделить всё

@Component
@EnableConfigurationProperties(ApplicationProperties.class)
public class SentenceMakerProvider {

private List sentenceMakersReactive;

public SentenceMakerProvider(
ApplicationProperties applicationProperties) {

sentenceMakersReactive = applicationProperties
.getEndpointConfigList()
.stream()
.map(endpointConfig -> {
HttpClient httpClient = HttpClient.create()
.wiretap(endpointConfig.getName(), LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL);
ClientHttpConnector clientConnector = new ReactorClientHttpConnector(httpClient);

WebClient webClient = WebClient.builder()
.clientConnector(clientConnector)
.baseUrl(endpointConfig.getBasePath())
.build();

return new SentenceMaker() {
@Override
public String makeSentence(String word) {
return webClient
.post()
.uri("/make-sentence")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.acceptCharset(StandardCharsets.UTF_8)
.body(BodyInserters.fromValue(word))
.retrieve()
.bodyToMono(String.class)
.block();
}

@Override
public String getName() {
return endpointConfig.getName();
}
};
})
.toList();
}

public List get(){
return sentenceMakersReactive;
}

}
Поскольку число поставщиков услуг увеличивается, конечные точки могут быть добавлены с помощью конфигурации, которая определена во внешнем свойстве ApplicationProperties:

Код: Выделить всё

@ConfigurationProperties(prefix = "endpoints")
@Data
public class ApplicationProperties {
private List  endpointConfigList = Collections.emptyList();
}

@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class EndpointConfig {
private String name;
private String basePath;
}
Которые настроены в application.yml как:

Код: Выделить всё

endpoints:
endpoint-config-list:
-
base-path: http://localhost:8081
name: endpoint-one
-
base-path: http://localhost:8082
name: endpoint-two
При отправке запроса в приложение поток застрял в тупике. Один из потоков, созданных CompletableFuture, который достигает веб-клиента, которому никогда не передается поток от реактора для вызова конечной точки.
Может кто-нибудь помочь мне понять, как этот тупик случается? Если я изменю веб-клиент на клиент rest, он будет работать, но я до сих пор не понимаю, почему один из потоков, созданных CompeletableFuture, не может продолжить вызов конечной точки.
Я ожидал, что веб-клиент для второго вызова также получит поток от реактора для продолжения своей работы.

Подробнее здесь: https://stackoverflow.com/questions/793 ... -webclient
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»