Новое в Quarkus и Reactive
Создание реактивного приложения в Quarkus с моделью запрос-ответ
Ресурс/контроллер получает запрос с его контекстом и отправляет запрос в нужный контекстный канал. Здесь Ресурс для разблокировки потока HTTP с помощью Future или UNI. Потребитель прослушивает канал, чтобы прослушать запрос, применить операцию и отправить ответ в канал ответа. Теперь Resource Future/Uni необходимо получить ответ и вернуть ответ пользователю.
Ниже приведены примеры кода.
Resource< /p>
@ApplicationScoped
public class MyCustomResource {
private static final ConcurrentHashMap responseMap = new ConcurrentHashMap();
@Inject
private MyCustomConfig MyCustomConfig;
@Inject
@Channel("request-channel")
Emitter requestEmitter;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/{contextName}/MyCustom")
public Uni submitRequest(@PathParam("contextName") String contextName, RequestPayload payload) {
String requestId = UUID.randomUUID().toString();
payload.setRequestId(requestId);
payload.setInstanceId(MyCustomConfig.apiInstance());
MyCustomConfig.ContextConfig contextConfig = getContextByName(contextName);
if (contextConfig == null) {
return Uni.createFrom().item(Response.status(Response.Status.NOT_FOUND).entity("Context not found: " +contextName).build());
}
String targetQueueName = contextConfig.requestQueueName() != null ? contextConfig.requestQueueName() : MyCustomConfig.requestQueueName();
CompletableFuture responseFuture = new CompletableFuture();
responseMap.put(requestId, responseFuture);
OutgoingRabbitMQMetadata metadata = OutgoingRabbitMQMetadata.builder()
.withRoutingKey(targetQueueName).withCorrelationId(requestId).build();
Message message = Message.of(payload).withMetadata(Collections.singleton(metadata));
return Uni.createFrom().item(() -> {
requestEmitter.send(message);
return "Request sent successfully";
}).flatMap(v -> Uni.createFrom().completionStage(responseFuture)).ifNoItem().after(Duration.ofMillis(contextConfig.responseWaitTimeoutMS()))
.failWith(() -> new RuntimeException("Response timeout for requestId: " + requestId));
}
private MyCustomConfig.ContextConfig getContextByName(String name) {
return MyCustomConfig.contexts().stream().filter(ctx -> ctx.name().equals(name)).findFirst().orElse(null);
}
@Incoming("response-channel")
public void onResponseMessage(ResponsePayload response) {
String correlationId = response.getRequestId();
CompletableFuture futureResponse = responseMap.remove(correlationId);
futureResponse.complete(Response.ok(response).build());
}
}
Потребитель
@ApplicationScoped
public class MyCustomConsumer {
@Inject
MyCustomConfig MyCustomConfig;
@Inject
@Channel("response-channel")
Emitter responseEmitter;
@Incoming("request-channel")
public void onMessage(Message message) {
RequestPayload request = message.getPayload();
String responseText = request.toString() + ", Processed Successfully";
ResponsePayload respPayload = new ResponsePayload();
respPayload.setRequestId(request.getRequestId());
respPayload.setResponse(responseText);
OutgoingRabbitMQMetadata metadata = OutgoingRabbitMQMetadata.builder()
.withRoutingKey(request.getInstanceId()).withCorrelationId(request.getRequestId()).build();
Message responseMessage = Message.of(respPayload).addMetadata(metadata);
responseEmitter.send(responseMessage);
}
}
application.properties
mp.messaging.outgoing.response-channel.connector=smallrye-rabbitmq
mp.messaging.outgoing.response-channel.host=localhost
mp.messaging.outgoing.response-channel.port=5672
mp.messaging.outgoing.response-channel.username=guest
mp.messaging.outgoing.response-channel.password=guest
mp.messaging.outgoing.response-channel.address=valueadd_response
mp.messaging.outgoing.response-channel.concurrency=1
mp.messaging.incoming.request-channel.connector=smallrye-rabbitmq
mp.messaging.incoming.request-channel.host=localhost
mp.messaging.incoming.request-channel.port=5672
mp.messaging.incoming.request-channel.username=guest
mp.messaging.incoming.request-channel.password=guest
mp.messaging.incoming.request-channel.address=valueadd_request
mp.messaging.incoming.request-channel.concurrency=1
Проблема, как показано ниже
Suppressed: io.smallrye.reactive.messaging.providers.wiring.TooManyDownstreamCandidatesException: 'Emitter{channel:'response-channel'}' supports a single downstream consumer, but found 2: [SubscriberMethod{method:'onResponseMessage', incoming:'response-channel'}, OutgoingConnector{channel:'response-channel', attribute:'mp.messaging.outgoing.response-channel'}]. You may want to enable broadcast using '@Broadcast' on the injected emitter field.
at io.smallrye.reactive.messaging.providers.wiring.Wiring$EmitterComponent.validate(Wiring.java:577)
at io.smallrye.reactive.messaging.providers.wiring.Graph.(Graph.java:67)
at io.smallrye.reactive.messaging.providers.wiring.Wiring.resolve(Wiring.java:211)
at io.smallrye.reactive.messaging.providers.wiring.Wiring_ClientProxy.resolve(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:253)
... 21 more
Suppressed: io.smallrye.reactive.messaging.providers.wiring.TooManyUpstreamCandidatesException: 'SubscriberMethod{method:'onMessage', incoming:'request-channel'}' supports a single upstream producer, but found 2: [Emitter{channel:'request-channel'}, IncomingConnector{channel:'request-channel', attribute:'mp.messaging.incoming.request-channel' , concurrency:'1'}]. You may want to add the '@Merge' annotation on the method.
at io.smallrye.reactive.messaging.providers.wiring.Wiring$SubscriberMediatorComponent.validate(Wiring.java:773)
at io.smallrye.reactive.messaging.providers.wiring.Wiring$ConcurrentComponent.validate(Wiring.java:970)
at io.smallrye.reactive.messaging.providers.wiring.Graph.(Graph.java:67)
at io.smallrye.reactive.messaging.providers.wiring.Wiring.resolve(Wiring.java:211)
at io.smallrye.reactive.messaging.providers.wiring.Wiring_ClientProxy.resolve(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:253)
... 21 more
Подробнее здесь: https://stackoverflow.com/questions/790 ... -messaging
Как добиться неблокирующего HTTP-реактивного обмена сообщениями Quarkus ⇐ JAVA
Программисты JAVA общаются здесь
1728106649
Anonymous
Новое в Quarkus и Reactive
Создание реактивного приложения в Quarkus с моделью запрос-ответ
Ресурс/контроллер получает запрос с его контекстом и отправляет запрос в нужный контекстный канал. Здесь Ресурс для разблокировки потока HTTP с помощью Future или UNI. Потребитель прослушивает канал, чтобы прослушать запрос, применить операцию и отправить ответ в канал ответа. Теперь Resource Future/Uni необходимо получить ответ и вернуть ответ пользователю.
Ниже приведены примеры кода.
[b]Resource[/b]< /p>
@ApplicationScoped
public class MyCustomResource {
private static final ConcurrentHashMap responseMap = new ConcurrentHashMap();
@Inject
private MyCustomConfig MyCustomConfig;
@Inject
@Channel("request-channel")
Emitter requestEmitter;
@POST
@Consumes(MediaType.APPLICATION_JSON)
@Path("/{contextName}/MyCustom")
public Uni submitRequest(@PathParam("contextName") String contextName, RequestPayload payload) {
String requestId = UUID.randomUUID().toString();
payload.setRequestId(requestId);
payload.setInstanceId(MyCustomConfig.apiInstance());
MyCustomConfig.ContextConfig contextConfig = getContextByName(contextName);
if (contextConfig == null) {
return Uni.createFrom().item(Response.status(Response.Status.NOT_FOUND).entity("Context not found: " +contextName).build());
}
String targetQueueName = contextConfig.requestQueueName() != null ? contextConfig.requestQueueName() : MyCustomConfig.requestQueueName();
CompletableFuture responseFuture = new CompletableFuture();
responseMap.put(requestId, responseFuture);
OutgoingRabbitMQMetadata metadata = OutgoingRabbitMQMetadata.builder()
.withRoutingKey(targetQueueName).withCorrelationId(requestId).build();
Message message = Message.of(payload).withMetadata(Collections.singleton(metadata));
return Uni.createFrom().item(() -> {
requestEmitter.send(message);
return "Request sent successfully";
}).flatMap(v -> Uni.createFrom().completionStage(responseFuture)).ifNoItem().after(Duration.ofMillis(contextConfig.responseWaitTimeoutMS()))
.failWith(() -> new RuntimeException("Response timeout for requestId: " + requestId));
}
private MyCustomConfig.ContextConfig getContextByName(String name) {
return MyCustomConfig.contexts().stream().filter(ctx -> ctx.name().equals(name)).findFirst().orElse(null);
}
@Incoming("response-channel")
public void onResponseMessage(ResponsePayload response) {
String correlationId = response.getRequestId();
CompletableFuture futureResponse = responseMap.remove(correlationId);
futureResponse.complete(Response.ok(response).build());
}
}
[b]Потребитель[/b]
@ApplicationScoped
public class MyCustomConsumer {
@Inject
MyCustomConfig MyCustomConfig;
@Inject
@Channel("response-channel")
Emitter responseEmitter;
@Incoming("request-channel")
public void onMessage(Message message) {
RequestPayload request = message.getPayload();
String responseText = request.toString() + ", Processed Successfully";
ResponsePayload respPayload = new ResponsePayload();
respPayload.setRequestId(request.getRequestId());
respPayload.setResponse(responseText);
OutgoingRabbitMQMetadata metadata = OutgoingRabbitMQMetadata.builder()
.withRoutingKey(request.getInstanceId()).withCorrelationId(request.getRequestId()).build();
Message responseMessage = Message.of(respPayload).addMetadata(metadata);
responseEmitter.send(responseMessage);
}
}
[b]application.properties[/b]
mp.messaging.outgoing.response-channel.connector=smallrye-rabbitmq
mp.messaging.outgoing.response-channel.host=localhost
mp.messaging.outgoing.response-channel.port=5672
mp.messaging.outgoing.response-channel.username=guest
mp.messaging.outgoing.response-channel.password=guest
mp.messaging.outgoing.response-channel.address=valueadd_response
mp.messaging.outgoing.response-channel.concurrency=1
mp.messaging.incoming.request-channel.connector=smallrye-rabbitmq
mp.messaging.incoming.request-channel.host=localhost
mp.messaging.incoming.request-channel.port=5672
mp.messaging.incoming.request-channel.username=guest
mp.messaging.incoming.request-channel.password=guest
mp.messaging.incoming.request-channel.address=valueadd_request
mp.messaging.incoming.request-channel.concurrency=1
[b]Проблема, как показано ниже[/b]
Suppressed: io.smallrye.reactive.messaging.providers.wiring.TooManyDownstreamCandidatesException: 'Emitter{channel:'response-channel'}' supports a single downstream consumer, but found 2: [SubscriberMethod{method:'onResponseMessage', incoming:'response-channel'}, OutgoingConnector{channel:'response-channel', attribute:'mp.messaging.outgoing.response-channel'}]. You may want to enable broadcast using '@Broadcast' on the injected emitter field.
at io.smallrye.reactive.messaging.providers.wiring.Wiring$EmitterComponent.validate(Wiring.java:577)
at io.smallrye.reactive.messaging.providers.wiring.Graph.(Graph.java:67)
at io.smallrye.reactive.messaging.providers.wiring.Wiring.resolve(Wiring.java:211)
at io.smallrye.reactive.messaging.providers.wiring.Wiring_ClientProxy.resolve(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:253)
... 21 more
Suppressed: io.smallrye.reactive.messaging.providers.wiring.TooManyUpstreamCandidatesException: 'SubscriberMethod{method:'onMessage', incoming:'request-channel'}' supports a single upstream producer, but found 2: [Emitter{channel:'request-channel'}, IncomingConnector{channel:'request-channel', attribute:'mp.messaging.incoming.request-channel' , concurrency:'1'}]. You may want to add the '@Merge' annotation on the method.
at io.smallrye.reactive.messaging.providers.wiring.Wiring$SubscriberMediatorComponent.validate(Wiring.java:773)
at io.smallrye.reactive.messaging.providers.wiring.Wiring$ConcurrentComponent.validate(Wiring.java:970)
at io.smallrye.reactive.messaging.providers.wiring.Graph.(Graph.java:67)
at io.smallrye.reactive.messaging.providers.wiring.Wiring.resolve(Wiring.java:211)
at io.smallrye.reactive.messaging.providers.wiring.Wiring_ClientProxy.resolve(Unknown Source)
at io.smallrye.reactive.messaging.providers.extension.MediatorManager.start(MediatorManager.java:253)
... 21 more
Подробнее здесь: [url]https://stackoverflow.com/questions/79056398/how-to-achieve-quarkus-non-blocking-http-reactive-messaging[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия