Как добиться неблокирующего HTTP-реактивного обмена сообщениями QuarkusJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как добиться неблокирующего HTTP-реактивного обмена сообщениями Quarkus

Сообщение Anonymous »

Новое в Quarkus и Reactive
Создание реактивного приложения в Quarkus с моделью запрос-ответ
Ресурс/контроллер получает запрос с его контекстом и отправляет запрос в нужный контекстный канал. Здесь Ресурс для разблокировки потока HTTP с помощью Future или UNI. Потребитель прослушивает канал, чтобы прослушать запрос, применить операцию и отправить ответ в канал ответа. Теперь Resource Future/Uni необходимо получить ответ и вернуть ответ пользователю.
Ниже приведены примеры кода.
Resource< /p>
@ApplicationScoped
public class MyCustomResource {
private static final ConcurrentHashMap responseMap = new ConcurrentHashMap();
@Inject
private MyCustomConfig MyCustomConfig;
@Inject
@Channel("request-channel")
Emitter requestEmitter;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/{contextName}/MyCustom")
public Uni submitRequest(@PathParam("contextName") String contextName, RequestPayload payload) {
String requestId = UUID.randomUUID().toString();
payload.setRequestId(requestId);
payload.setInstanceId(MyCustomConfig.apiInstance());
MyCustomConfig.ContextConfig contextConfig = getContextByName(contextName);
if (contextConfig == null) {
return Uni.createFrom().item(Response.status(Response.Status.NOT_FOUND).entity("Context not found: " +contextName).build());
}
String targetQueueName = contextConfig.requestQueueName() != null ? contextConfig.requestQueueName() : MyCustomConfig.requestQueueName();
CompletableFuture responseFuture = new CompletableFuture();
responseMap.put(requestId, responseFuture);
OutgoingRabbitMQMetadata metadata = OutgoingRabbitMQMetadata.builder()
.withRoutingKey(targetQueueName).withCorrelationId(requestId).build();
Message message = Message.of(payload).withMetadata(Collections.singleton(metadata));
return Uni.createFrom().item(() -> {
requestEmitter.send(message);
return "Request sent successfully";
}).flatMap(v -> Uni.createFrom().completionStage(responseFuture)).ifNoItem().after(Duration.ofMillis(contextConfig.responseWaitTimeoutMS()))
.failWith(() -> new RuntimeException("Response timeout for requestId: " + requestId));
}
private MyCustomConfig.ContextConfig getContextByName(String name) {
return MyCustomConfig.contexts().stream().filter(ctx -> ctx.name().equals(name)).findFirst().orElse(null);
}
@Incoming("response-channel")
public void onResponseMessage(ResponsePayload response) {
String correlationId = response.getRequestId();
CompletableFuture futureResponse = responseMap.remove(correlationId);
futureResponse.complete(Response.ok(response).build());
}
}

Потребитель
@ApplicationScoped
public class MyCustomConsumer {
@Inject
MyCustomConfig MyCustomConfig;
@Inject
@Channel("response-channel")
Emitter responseEmitter;
@Incoming("request-channel")
public void onMessage(Message message) {
RequestPayload request = message.getPayload();
String responseText = request.toString() + ", Processed Successfully";
ResponsePayload respPayload = new ResponsePayload();
respPayload.setRequestId(request.getRequestId());
respPayload.setResponse(responseText);
OutgoingRabbitMQMetadata metadata = OutgoingRabbitMQMetadata.builder()
.withRoutingKey(request.getInstanceId()).withCorrelationId(request.getRequestId()).build();
Message responseMessage = Message.of(respPayload).addMetadata(metadata);
responseEmitter.send(responseMessage);
}
}

application.properties
mp.messaging.outgoing.response-channel.connector=smallrye-rabbitmq
mp.messaging.outgoing.response-channel.host=localhost
mp.messaging.outgoing.response-channel.port=5672
mp.messaging.outgoing.response-channel.username=guest
mp.messaging.outgoing.response-channel.password=guest
mp.messaging.outgoing.response-channel.address=valueadd_response
mp.messaging.outgoing.response-channel.concurrency=1

mp.messaging.incoming.request-channel.connector=smallrye-rabbitmq
mp.messaging.incoming.request-channel.host=localhost
mp.messaging.incoming.request-channel.port=5672
mp.messaging.incoming.request-channel.username=guest
mp.messaging.incoming.request-channel.password=guest
mp.messaging.incoming.request-channel.address=valueadd_request
mp.messaging.incoming.request-channel.concurrency=1

Проблема, как показано ниже
Suppressed: io.smallrye.reactive.messaging.providers.wiring.TooManyDownstreamCandidatesException: 'Emitter{channel:'response-channel'}' supports a single downstream consumer, but found 2: [SubscriberMethod{method:'onResponseMessage', incoming:'response-channel'}, OutgoingConnector{channel:'response-channel', attribute:'mp.messaging.outgoing.response-channel'}]. You may want to enable broadcast using '@Broadcast' on the injected emitter field.
at io.smallrye.reactive.messaging.providers.wiring.Wiring$EmitterComponent.validate(Wiring.java:577)
at io.smallrye.reactive.messaging.providers.wiring.Graph.(Graph.java:67)
at io.smallrye.reactive.messaging.providers.wiring.Wiring.resolve(Wiring.java:211)
at io.smallrye.reactive.messaging.providers.wiring.Wiring_ClientProxy.resolve(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:253)
... 21 more
Suppressed: io.smallrye.reactive.messaging.providers.wiring.TooManyUpstreamCandidatesException: 'SubscriberMethod{method:'onMessage', incoming:'request-channel'}' supports a single upstream producer, but found 2: [Emitter{channel:'request-channel'}, IncomingConnector{channel:'request-channel', attribute:'mp.messaging.incoming.request-channel' , concurrency:'1'}]. You may want to add the '@Merge' annotation on the method.
at io.smallrye.reactive.messaging.providers.wiring.Wiring$SubscriberMediatorComponent.validate(Wiring.java:773)
at io.smallrye.reactive.messaging.providers.wiring.Wiring$ConcurrentComponent.validate(Wiring.java:970)
at io.smallrye.reactive.messaging.providers.wiring.Graph.(Graph.java:67)
at io.smallrye.reactive.messaging.providers.wiring.Wiring.resolve(Wiring.java:211)
at io.smallrye.reactive.messaging.providers.wiring.Wiring_ClientProxy.resolve(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:253)
... 21 more


Подробнее здесь: https://stackoverflow.com/questions/790 ... -messaging
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»