Сообщения не отправляются в очередь. RabbitMQ + Apache CamelJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Сообщения не отправляются в очередь. RabbitMQ + Apache Camel

Сообщение Anonymous »

Я пытаюсь протестировать RabbitMQ и Apache Camel. События регистрируются в обменнике, но в очередь не отправляются. Скорость сообщений меняется в Exchange и Queue (Consumers ACK).

Сервис Apache Camel взаимодействует с RabbitMQ, соединение есть, логи доступны.
Отладка ответа работает только в том случае, если вы публикуете сообщение через пользовательский интерфейс в Rabbit.

Код:
package com.example.camel_demo.routes;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
import java.util.Map;

@Component
public class CommandRoute extends RouteBuilder {
@Override
public void configure() {

from("rabbitmq:exchange.commands"
+ "?queue=integration.commands"
+ "&routingKey=integration.commands"
+ "&exchangeType=direct"
+ "&username=&password="
+ "&durable=true"
+ "&autoDelete=false"
+ "&autoAck=false"
+ "&declare=true")
.routeId("command-route")
.convertBodyTo(String.class)
.log("COMMAND ROUTE: received message - ${body}")
.unmarshal().json()

.process(exchange -> {
Map body = exchange.getIn().getBody(Map.class);
String sourceType = (String) body.get("sourceType");
exchange.setProperty("sourceType", sourceType);
})

.choice()
.when(simple("${exchangeProperty.sourceType} == 'json'"))
.log("Routing to JSON Adapter")
.to("direct:json-adapter")
.otherwise()
.log("Unknown sourceType: ${exchangeProperty.sourceType}")
.process(exchange -> {
Map body = exchange.getIn().getBody(Map.class);
String requestId = (String) body.get("requestId");
Map error = new HashMap();
error.put("error", "Unknown source type");
error.put("requestId", requestId);
exchange.getIn().setBody(error);
})
.marshal().json()
.to("direct:response-handler")
.end();
}
}

package com.example.camel_demo.routes;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class JsonAdapterRoute extends RouteBuilder {
@Override
public void configure() {

from("direct:json-adapter")
.routeId("json-adapter")
.log("JSON ADAPTER: processing request")

.setProperty("originalBody", body())

.process(exchange -> {
Map body = exchange.getIn().getBody(Map.class);
String requestId = (String) body.get("requestId");
String baseUrl = (String) body.get("baseUrl");

exchange.setProperty("requestId", requestId);
exchange.setProperty("baseUrl", baseUrl);
})

.log("Target URL: ${exchangeProperty.baseUrl}")
.log("Request ID: ${exchangeProperty.requestId}")

// HTTP запрос
.setBody(constant(null))
.toD("${exchangeProperty.baseUrl}?bridgeEndpoint=true")

// Формируем ответ
.process(exchange -> {
String requestId = exchange.getProperty("requestId", String.class);
String responseData = exchange.getIn().getBody(String.class);

Map response = new HashMap();
response.put("requestId", requestId);
response.put("status", "OK");
response.put("data", responseData);

exchange.getIn().setBody(response);
})
.marshal().json()
.log("Response ready: ${body}")

.to("direct:response-handler");
}
}

package com.example.camel_demo.routes;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ResponseHandlerRoute extends RouteBuilder {
@Override
public void configure() {

from("direct:response-handler")
.routeId("response-handler")
.log("RESPONSE HANDLER: sending to RabbitMQ")
.to("rabbitmq:exchange.responses"
+ "?queue=integration.responses"
+ "&routingKey=integration.responses"
+ "&exchangeType=direct"
+ "&username=&password="
+ "&durable=true"
+ "&autoDelete=false"
+ "&autoAck=false"
+ "&declare=true")
.log("Response sent to queue");
}
}

package com.example.camel_demo.routes;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class ResponseDebugRoute extends RouteBuilder {
@Override
public void configure() {

from("rabbitmq:exchange.responses"
+ "?queue=integration.responses"
+ "&routingKey=integration.responses"
+ "&exchangeType=direct"
+ "&username=&password="
+ "&durable=true"
+ "&autoDelete=false"
+ "&autoAck=false"
+ "&declare=true")
.routeId("response-debug-route")
.convertBodyTo(String.class)
.log("RESPONSE RECEIVED: ${body}");
}
}


Подробнее здесь: https://stackoverflow.com/questions/798 ... ache-camel
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»