У меня есть настройка приложения на основе JMS, в которой одно приложение (сторонний компонент без доступа на уровне кода, но настраиваемый) выступает в качестве единственного производителя сообщений. Этот производитель отправляет сообщения брокеру (ActiveMQ Artemis), и у меня есть несколько компонентов, управляемых сообщениями (MDB), которые должны использовать эти сообщения, но только один MDB должен получать данное сообщение, в зависимости от некоторых критериев (смотря на свойство сообщения, которое отправил производитель).
Моя идея состоит в том, чтобы реализовать собственный подключаемый модуль сервера ActiveMQ Artemis на Java для обработки логики маршрутизации.
Производитель всегда отправляет сообщения в одну родительскую очередь (например, ParentQ). У меня есть несколько потребителей MDB, каждый из которых прослушивает свои дочерние очереди (ChildQ1, ChildQ2, ...). Фактическая маршрутизация должна происходить внутри брокера с использованием перенаправлений Artemis, определенных вbroker.xml.
для написания плагина использовался интерфейс org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin,
@Override
public void beforeSend(ServerSession serverSession,
Transaction transaction,
Message message,
boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {
final String messageAddress =
(message.getAddressSimpleString() != null)
? message.getAddressSimpleString().toString()
: "";
logger.info("[RoutingPlugin] beforeSend: direct={} address={}", direct, messageAddress);
// Apply routing only for configured addresses
if (!monitoredAddresses.contains(messageAddress)) {
return;
}
// Skip if routing property is already set
if (message.containsProperty(destinationPropertyName)) {
return;
}
// Decode message body (UTF-16LE)
final String body = decodeBodyAsUtf16(message);
if (body.isEmpty()) {
logger.error("[RoutingPlugin] Message body is empty for msgId={}", message.getMessageID());
message.putStringProperty("ROUTING_ERROR", "EMPTY_BODY");
return;
}
// Extract required metadata
final String sessionId = extractSimpleField(body, "SessionID");
final String msgType = extractSimpleField(body, "MessageType");
final String clientMsgId = extractSimpleField(body, "ClientMsgID");
final String payload = extractPayload(body);
// These logs reflect the real observed values during runtime
logger.info("[RoutingPlugin] Decoded from body: SessionID={}, MsgType={}, ClientMsgID={}",
sessionId, msgType, clientMsgId);
if (sessionId == null || payload == null) {
logger.error("[RoutingPlugin] DECODE_ERROR:SESSION_OR_PAYLOAD_MISSING for messageID={}",
message.getMessageID());
message.putStringProperty("ROUTING_ERROR", "DECODE_ERROR:SESSION_OR_PAYLOAD_MISSING");
return;
}
// Parse business-level content
try {
ParsedMessage parsed = messageParser.parse(payload, sessionId);
String orderId = parsed.clientOrderId();
String type = parsed.messageTypeCode();
// Determine routing target
String target = determineTarget(orderId, type, sessionId);
// Store routing indicator
message.putStringProperty(destinationPropertyName, target);
logger.info("[RoutingPlugin] Set {} = {}", destinationPropertyName, target);
// Optional message grouping
switch (groupByMode) {
case "id" -> {
if (orderId != null) {
message.putStringProperty(JMS_GROUP_PROPERTY, orderId);
}
}
case "dest" -> message.putStringProperty(JMS_GROUP_PROPERTY, target);
default -> { /* no grouping */ }
}
} catch (Exception ex) {
logger.error("[RoutingPlugin] Message parse failure for msgId={}", message.getMessageID(), ex);
message.putStringProperty("ROUTING_ERROR", "PARSE_FAILURE");
}
}
Дополнительные методы:
/** Reads the message body buffer using UTF-16LE encoding. */
private static String decodeBodyAsUtf16(Message message) {
ActiveMQBuffer buffer = message.getBodyBuffer();
if (buffer.readableBytes() == 0) {
return "";
}
byte[] bytes = new byte[buffer.readableBytes()];
buffer.getBytes(buffer.readerIndex(), bytes);
return new String(bytes, StandardCharsets.UTF_16LE);
}
/**
* Extracts a simple ASCII field that appears in the format:
*
* key value
*
* Control chars are trimmed. Used for fields like:
* SessionID
* MessageType
* ClientMsgID
*/
private static String extractSimpleField(String text, String key) {
int idx = text.indexOf(key);
if (idx < 0) return null;
int i = idx + key.length();
while (i < text.length() && text.charAt(i) 0x20) i++;
return (start < i) ? text.substring(start, i) : null;
}
/**
* Extracts business payload following an STX (0x02) control character
* that appears after the "EventData" marker.
*/
private static String extractPayload(String text) {
int marker = text.indexOf("EventData");
if (marker < 0) return null;
int stx = text.indexOf('\u0002', marker);
if (stx < 0 || stx + 1 >= text.length()) return null;
int start = stx + 1;
int end = text.length();
while (end > start && text.charAt(end - 1) == '\u0000') {
end--;
}
return text.substring(start, end);
}
/**
* Determines routing target based on business identifiers.
* Actual logic is environment-specific.
*/
private String determineTarget(String orderId, String type, String sessionId) {
// Production-grade placeholder routing logic
if (orderId != null && orderId.startsWith("A")) {
return "DEST_A";
}
if ("SESSION_X".equals(sessionId)) {
return "DEST_B";
}
return "DEST_DEFAULT";
}
но моя проблема в том, что я не смог правильно проанализировать сообщение, как сказано в журналах ниже, оно извлекает нулевые значения;
[RoutingPlugin] beforeSend: direct=true address=FromSellSide
[RoutingPlugin] Decoded from body: SessionID=null, MsgType=null, ClientMsgID=
[RoutingPlugin] DECODE_ERROR:SESSION_OR_FIX_MISSING for messageID=0
Как правильно проанализировать тело сообщения org.apache.activemq.artemis.api.core.Message?
пример сообщения отображается в ParentQ:
bytes:
01 00 00 00 08 00 00 00 12 53 00 65 00 73 00 73 00 69 00 6f 00 6e 00 49 00 44 00 0a 00 00 00 12 43 00 4f 00 4d 00 4d 00 4f 00 4e 00 45 00 58 00 45 00 00 00 00 12 45 00 76 00 65 00 6e 00 74 00 44 00 61 00 74 00 61 00 0a 00 00 02 4c 38 00 3d 00 46 00 49 00 58 00 2e 00 34 00 2e 00 32 00 01 00 39 00 3d 00 32 00 37 00 31 00 01 00 33 00 35 00 3d 00 44 00 01 00 33 00 34 00 3d 00 31 00 39 00 01 00 34 00 33 00 3d 00 59 00 01 00 34 00 39 00 3d 00 53 00 49 00 4d 00 55 00 4c 00 41 00 54 00 4f 00 52 00 01 00 35 00 30 00 3d 00 35 00 32 00 34 00 33 00 33 00 34 00 65 00 66 00 72 00 01 00 35 00 32 00 3d 00 32 00 30 00 32 00 35 00 31 00 31 00 32 00 31 00 2d 00 30 00 34 00 3a 00 32 00 33 00 3a 00 33 00 34 00 2e 00 32 00 35 00 36 00 01 00 35 00 36 00 3d 00 4f 00 4d 00 53 00 5f
text:
SessionID
COMMONEXEEventData
L8=FIX.4.29=27135=D34=1943=Y49=SIMULATOR50=524334efr52=20251121-04:23:34.25656=OMS_
Подробнее здесь: https://stackoverflow.com/questions/798 ... bs-based-o
Как направить сообщения от одного производителя JMS в одну из нескольких MDB в зависимости от условий? ⇐ JAVA
Программисты JAVA общаются здесь
1763958262
Anonymous
У меня есть настройка приложения на основе JMS, в которой одно приложение (сторонний компонент без доступа на уровне кода, но настраиваемый) выступает в качестве единственного производителя сообщений. Этот производитель отправляет сообщения брокеру (ActiveMQ Artemis), и у меня есть несколько компонентов, управляемых сообщениями (MDB), которые должны использовать эти сообщения, но только один MDB должен получать данное сообщение, в зависимости от некоторых критериев (смотря на свойство сообщения, которое отправил производитель).
Моя идея состоит в том, чтобы реализовать собственный подключаемый модуль сервера ActiveMQ Artemis на Java для обработки логики маршрутизации.
Производитель всегда отправляет сообщения в одну родительскую очередь (например, ParentQ). У меня есть несколько потребителей MDB, каждый из которых прослушивает свои дочерние очереди (ChildQ1, ChildQ2, ...). Фактическая маршрутизация должна происходить внутри брокера с использованием перенаправлений Artemis, определенных вbroker.xml.
для написания плагина использовался интерфейс org.apache.activemq.artemis.core.server.plugin.ActiveMQServerPlugin,
@Override
public void beforeSend(ServerSession serverSession,
Transaction transaction,
Message message,
boolean direct,
boolean noAutoCreateQueue) throws ActiveMQException {
final String messageAddress =
(message.getAddressSimpleString() != null)
? message.getAddressSimpleString().toString()
: "";
logger.info("[RoutingPlugin] beforeSend: direct={} address={}", direct, messageAddress);
// Apply routing only for configured addresses
if (!monitoredAddresses.contains(messageAddress)) {
return;
}
// Skip if routing property is already set
if (message.containsProperty(destinationPropertyName)) {
return;
}
// Decode message body (UTF-16LE)
final String body = decodeBodyAsUtf16(message);
if (body.isEmpty()) {
logger.error("[RoutingPlugin] Message body is empty for msgId={}", message.getMessageID());
message.putStringProperty("ROUTING_ERROR", "EMPTY_BODY");
return;
}
// Extract required metadata
final String sessionId = extractSimpleField(body, "SessionID");
final String msgType = extractSimpleField(body, "MessageType");
final String clientMsgId = extractSimpleField(body, "ClientMsgID");
final String payload = extractPayload(body);
// These logs reflect the real observed values during runtime
logger.info("[RoutingPlugin] Decoded from body: SessionID={}, MsgType={}, ClientMsgID={}",
sessionId, msgType, clientMsgId);
if (sessionId == null || payload == null) {
logger.error("[RoutingPlugin] DECODE_ERROR:SESSION_OR_PAYLOAD_MISSING for messageID={}",
message.getMessageID());
message.putStringProperty("ROUTING_ERROR", "DECODE_ERROR:SESSION_OR_PAYLOAD_MISSING");
return;
}
// Parse business-level content
try {
ParsedMessage parsed = messageParser.parse(payload, sessionId);
String orderId = parsed.clientOrderId();
String type = parsed.messageTypeCode();
// Determine routing target
String target = determineTarget(orderId, type, sessionId);
// Store routing indicator
message.putStringProperty(destinationPropertyName, target);
logger.info("[RoutingPlugin] Set {} = {}", destinationPropertyName, target);
// Optional message grouping
switch (groupByMode) {
case "id" -> {
if (orderId != null) {
message.putStringProperty(JMS_GROUP_PROPERTY, orderId);
}
}
case "dest" -> message.putStringProperty(JMS_GROUP_PROPERTY, target);
default -> { /* no grouping */ }
}
} catch (Exception ex) {
logger.error("[RoutingPlugin] Message parse failure for msgId={}", message.getMessageID(), ex);
message.putStringProperty("ROUTING_ERROR", "PARSE_FAILURE");
}
}
Дополнительные методы:
/** Reads the message body buffer using UTF-16LE encoding. */
private static String decodeBodyAsUtf16(Message message) {
ActiveMQBuffer buffer = message.getBodyBuffer();
if (buffer.readableBytes() == 0) {
return "";
}
byte[] bytes = new byte[buffer.readableBytes()];
buffer.getBytes(buffer.readerIndex(), bytes);
return new String(bytes, StandardCharsets.UTF_16LE);
}
/**
* Extracts a simple ASCII field that appears in the format:
*
* key value
*
* Control chars are trimmed. Used for fields like:
* SessionID
* MessageType
* ClientMsgID
*/
private static String extractSimpleField(String text, String key) {
int idx = text.indexOf(key);
if (idx < 0) return null;
int i = idx + key.length();
while (i < text.length() && text.charAt(i) 0x20) i++;
return (start < i) ? text.substring(start, i) : null;
}
/**
* Extracts business payload following an STX (0x02) control character
* that appears after the "EventData" marker.
*/
private static String extractPayload(String text) {
int marker = text.indexOf("EventData");
if (marker < 0) return null;
int stx = text.indexOf('\u0002', marker);
if (stx < 0 || stx + 1 >= text.length()) return null;
int start = stx + 1;
int end = text.length();
while (end > start && text.charAt(end - 1) == '\u0000') {
end--;
}
return text.substring(start, end);
}
/**
* Determines routing target based on business identifiers.
* Actual logic is environment-specific.
*/
private String determineTarget(String orderId, String type, String sessionId) {
// Production-grade placeholder routing logic
if (orderId != null && orderId.startsWith("A")) {
return "DEST_A";
}
if ("SESSION_X".equals(sessionId)) {
return "DEST_B";
}
return "DEST_DEFAULT";
}
но моя проблема в том, что я не смог правильно проанализировать сообщение, как сказано в журналах ниже, оно извлекает нулевые значения;
[RoutingPlugin] beforeSend: direct=true address=FromSellSide
[RoutingPlugin] Decoded from body: SessionID=null, MsgType=null, ClientMsgID=
[RoutingPlugin] DECODE_ERROR:SESSION_OR_FIX_MISSING for messageID=0
Как правильно проанализировать тело сообщения org.apache.activemq.artemis.api.core.Message?
пример сообщения отображается в ParentQ:
bytes:
01 00 00 00 08 00 00 00 12 53 00 65 00 73 00 73 00 69 00 6f 00 6e 00 49 00 44 00 0a 00 00 00 12 43 00 4f 00 4d 00 4d 00 4f 00 4e 00 45 00 58 00 45 00 00 00 00 12 45 00 76 00 65 00 6e 00 74 00 44 00 61 00 74 00 61 00 0a 00 00 02 4c 38 00 3d 00 46 00 49 00 58 00 2e 00 34 00 2e 00 32 00 01 00 39 00 3d 00 32 00 37 00 31 00 01 00 33 00 35 00 3d 00 44 00 01 00 33 00 34 00 3d 00 31 00 39 00 01 00 34 00 33 00 3d 00 59 00 01 00 34 00 39 00 3d 00 53 00 49 00 4d 00 55 00 4c 00 41 00 54 00 4f 00 52 00 01 00 35 00 30 00 3d 00 35 00 32 00 34 00 33 00 33 00 34 00 65 00 66 00 72 00 01 00 35 00 32 00 3d 00 32 00 30 00 32 00 35 00 31 00 31 00 32 00 31 00 2d 00 30 00 34 00 3a 00 32 00 33 00 3a 00 33 00 34 00 2e 00 32 00 35 00 36 00 01 00 35 00 36 00 3d 00 4f 00 4d 00 53 00 5f
text:
SessionID
COMMONEXEEventData
L8=FIX.4.29=27135=D34=1943=Y49=SIMULATOR50=524334efr52=20251121-04:23:34.25656=OMS_
Подробнее здесь: [url]https://stackoverflow.com/questions/79826409/how-to-route-messages-from-a-single-jms-producer-to-one-of-multiple-mdbs-based-o[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия