Как направить сообщения от одного производителя JMS в одну из нескольких MDB в зависимости от условий?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как направить сообщения от одного производителя JMS в одну из нескольких MDB в зависимости от условий?

Сообщение Anonymous »

У меня есть настройка приложения на основе JMS, в которой одно приложение (сторонний компонент без доступа на уровне кода, но настраиваемый) выступает в качестве единственного производителя сообщений. Этот производитель отправляет сообщения брокеру (ActiveMQ Artemis), и у меня есть несколько компонентов, управляемых сообщениями (MDB), которые должны использовать эти сообщения, но только один MDB должен получать данное сообщение, в зависимости от некоторых критериев (смотря на свойство сообщения, которое отправил производитель).
Моя идея состоит в том, чтобы реализовать собственный подключаемый модуль сервера ActiveMQ Artemis на Java для обработки логики маршрутизации.
Производитель всегда отправляет сообщения в одну родительскую очередь (например, ParentQ). У меня есть несколько потребителей MDB, каждый из которых прослушивает свои дочерние очереди (ChildQ1, ChildQ2, ...). Фактическая маршрутизация должна происходить внутри брокера с использованием перенаправлений Artemis, определенных вbroker.xml.
для написания плагина использовался интерфейс org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin,
@Override
public void beforeSend(ServerSession serverSession,
Transaction transaction,
Message message,
boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {

final String messageAddress =
(message.getAddressSimpleString() != null)
? message.getAddressSimpleString().toString()
: "";

logger.info("[RoutingPlugin] beforeSend: direct={} address={}", direct, messageAddress);

// Apply routing only for configured addresses
if (!monitoredAddresses.contains(messageAddress)) {
return;
}

// Skip if routing property is already set
if (message.containsProperty(destinationPropertyName)) {
return;
}

// Decode message body (UTF-16LE)
final String body = decodeBodyAsUtf16(message);
if (body.isEmpty()) {
logger.error("[RoutingPlugin] Message body is empty for msgId={}", message.getMessageID());
message.putStringProperty("ROUTING_ERROR", "EMPTY_BODY");
return;
}

// Extract required metadata
final String sessionId = extractSimpleField(body, "SessionID");
final String msgType = extractSimpleField(body, "MessageType");
final String clientMsgId = extractSimpleField(body, "ClientMsgID");
final String payload = extractPayload(body);

// These logs reflect the real observed values during runtime
logger.info("[RoutingPlugin] Decoded from body: SessionID={}, MsgType={}, ClientMsgID={}",
sessionId, msgType, clientMsgId);

if (sessionId == null || payload == null) {
logger.error("[RoutingPlugin] DECODE_ERROR:SESSION_OR_PAYLOAD_MISSING for messageID={}",
message.getMessageID());
message.putStringProperty("ROUTING_ERROR", "DECODE_ERROR:SESSION_OR_PAYLOAD_MISSING");
return;
}

// Parse business-level content
try {
ParsedMessage parsed = messageParser.parse(payload, sessionId);

String orderId = parsed.clientOrderId();
String type = parsed.messageTypeCode();

// Determine routing target
String target = determineTarget(orderId, type, sessionId);

// Store routing indicator
message.putStringProperty(destinationPropertyName, target);
logger.info("[RoutingPlugin] Set {} = {}", destinationPropertyName, target);

// Optional message grouping
switch (groupByMode) {
case "id" -> {
if (orderId != null) {
message.putStringProperty(JMS_GROUP_PROPERTY, orderId);
}
}
case "dest" -> message.putStringProperty(JMS_GROUP_PROPERTY, target);
default -> { /* no grouping */ }
}

} catch (Exception ex) {
logger.error("[RoutingPlugin] Message parse failure for msgId={}", message.getMessageID(), ex);
message.putStringProperty("ROUTING_ERROR", "PARSE_FAILURE");
}
}

Дополнительные методы:
/** Reads the message body buffer using UTF-16LE encoding. */
private static String decodeBodyAsUtf16(Message message) {
ActiveMQBuffer buffer = message.getBodyBuffer();
if (buffer.readableBytes() == 0) {
return "";
}

byte[] bytes = new byte[buffer.readableBytes()];
buffer.getBytes(buffer.readerIndex(), bytes);
return new String(bytes, StandardCharsets.UTF_16LE);
}

/**
* Extracts a simple ASCII field that appears in the format:
*
* key value
*
* Control chars are trimmed. Used for fields like:
* SessionID
* MessageType
* ClientMsgID
*/
private static String extractSimpleField(String text, String key) {
int idx = text.indexOf(key);
if (idx < 0) return null;

int i = idx + key.length();
while (i < text.length() && text.charAt(i) 0x20) i++;

return (start < i) ? text.substring(start, i) : null;
}

/**
* Extracts business payload following an STX (0x02) control character
* that appears after the "EventData" marker.
*/
private static String extractPayload(String text) {
int marker = text.indexOf("EventData");
if (marker < 0) return null;

int stx = text.indexOf('\u0002', marker);
if (stx < 0 || stx + 1 >= text.length()) return null;

int start = stx + 1;
int end = text.length();

while (end > start && text.charAt(end - 1) == '\u0000') {
end--;
}

return text.substring(start, end);
}

/**
* Determines routing target based on business identifiers.
* Actual logic is environment-specific.
*/
private String determineTarget(String orderId, String type, String sessionId) {

// Production-grade placeholder routing logic
if (orderId != null && orderId.startsWith("A")) {
return "DEST_A";
}

if ("SESSION_X".equals(sessionId)) {
return "DEST_B";
}

return "DEST_DEFAULT";
}

но моя проблема в том, что я не смог правильно проанализировать сообщение, как сказано в журналах ниже, оно извлекает нулевые значения;
[RoutingPlugin] beforeSend: direct=true address=FromSellSide
[RoutingPlugin] Decoded from body: SessionID=null, MsgType=null, ClientMsgID=
[RoutingPlugin] DECODE_ERROR:SESSION_OR_FIX_MISSING for messageID=0

Как правильно проанализировать тело сообщения org.apache.activemq.artemis.api.core.Message?
пример сообщения отображается в ParentQ:
bytes:
01 00 00 00 08 00 00 00 12 53 00 65 00 73 00 73 00 69 00 6f 00 6e 00 49 00 44 00 0a 00 00 00 12 43 00 4f 00 4d 00 4d 00 4f 00 4e 00 45 00 58 00 45 00 00 00 00 12 45 00 76 00 65 00 6e 00 74 00 44 00 61 00 74 00 61 00 0a 00 00 02 4c 38 00 3d 00 46 00 49 00 58 00 2e 00 34 00 2e 00 32 00 01 00 39 00 3d 00 32 00 37 00 31 00 01 00 33 00 35 00 3d 00 44 00 01 00 33 00 34 00 3d 00 31 00 39 00 01 00 34 00 33 00 3d 00 59 00 01 00 34 00 39 00 3d 00 53 00 49 00 4d 00 55 00 4c 00 41 00 54 00 4f 00 52 00 01 00 35 00 30 00 3d 00 35 00 32 00 34 00 33 00 33 00 34 00 65 00 66 00 72 00 01 00 35 00 32 00 3d 00 32 00 30 00 32 00 35 00 31 00 31 00 32 00 31 00 2d 00 30 00 34 00 3a 00 32 00 33 00 3a 00 33 00 34 00 2e 00 32 00 35 00 36 00 01 00 35 00 36 00 3d 00 4f 00 4d 00 53 00 5f

text:
SessionID
COMMONEXEEventData
L8=FIX.4.29=27135=D34=1943=Y49=SIMULATOR50=524334efr52=20251121-04:23:34.25656=OMS_


Подробнее здесь: https://stackoverflow.com/questions/798 ... bs-based-o
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»