У нас есть определенный интерфейс SentenceMaker как:
Код: Выделить всё
public interface SentenceMaker {
String makeSentence(String word);
String getName();
}
Я реализовал BroadcastService и обработку после получения всех ответов, скажем, сортировку предложений по длине предложений следующим образом:
Код: Выделить всё
@Service
@Slf4j
public class BroadcastService {
@Autowired
private SentenceMakerProvider sentenceMakerProvider;
public List broadcast(String word) {
List allResponses = sentenceMakerProvider.get()
.stream()
.map(sentenceMaker ->
CompletableFuture.supplyAsync(() -> {
log.info("Sending to == {} ==> to make a sentence.", sentenceMaker.getName());
return sentenceMaker.makeSentence(word);
})
.thenApply(sentence -> {
log.info("Got sentence: {}", sentence);
return sentence;
})
)
.toList();
CompletableFuture
.allOf(allResponses.stream().toArray(CompletableFuture[]::new))
.join();
return allResponses
.stream()
.map(CompletableFuture::join)
.sorted(Comparator.comparingInt(String::length))
.toList();
}
И есть SentenceMakerProvider, который сохраняет ссылку на реализации всех поставщиков услуг:
Код: Выделить всё
@Component
@EnableConfigurationProperties(ApplicationProperties.class)
public class SentenceMakerProvider {
private List sentenceMakersReactive;
public SentenceMakerProvider(
ApplicationProperties applicationProperties) {
sentenceMakersReactive = applicationProperties
.getEndpointConfigList()
.stream()
.map(endpointConfig -> {
HttpClient httpClient = HttpClient.create()
.wiretap(endpointConfig.getName(), LogLevel.DEBUG, AdvancedByteBufFormat.TEXTUAL);
ClientHttpConnector clientConnector = new ReactorClientHttpConnector(httpClient);
WebClient webClient = WebClient.builder()
.clientConnector(clientConnector)
.baseUrl(endpointConfig.getBasePath())
.build();
return new SentenceMaker() {
@Override
public String makeSentence(String word) {
return webClient
.post()
.uri("/make-sentence")
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.acceptCharset(StandardCharsets.UTF_8)
.body(BodyInserters.fromValue(word))
.retrieve()
.bodyToMono(String.class)
.block();
}
@Override
public String getName() {
return endpointConfig.getName();
}
};
})
.toList();
}
public List get(){
return sentenceMakersReactive;
}
}
Код: Выделить всё
@ConfigurationProperties(prefix = "endpoints")
@Data
public class ApplicationProperties {
private List endpointConfigList = Collections.emptyList();
}
@NoArgsConstructor
@AllArgsConstructor
@Data
@Builder
public class EndpointConfig {
private String name;
private String basePath;
}
Код: Выделить всё
endpoints:
endpoint-config-list:
-
base-path: http://localhost:8081
name: endpoint-one
-
base-path: http://localhost:8082
name: endpoint-two
Может кто-нибудь помочь мне понять, как этот тупик случается? Если я изменю веб-клиент на клиент rest, он будет работать, но я до сих пор не понимаю, почему один из потоков, созданных CompeletableFuture, не может продолжить вызов конечной точки.
Я ожидал, что веб-клиент для второго вызова также получит поток от реактора для продолжения своей работы.
Подробнее здесь: https://stackoverflow.com/questions/793 ... -webclient
Мобильная версия