Чтение сообщений из Oracle AQ с помощью специальной полезной нагрузки ADT.JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Чтение сообщений из Oracle AQ с помощью специальной полезной нагрузки ADT.

Сообщение Anonymous »

Я пытаюсь прочитать сообщения из очереди Oracle, которая имеет пользовательскую полезную нагрузку ADT (а не простой обмен текстовыми сообщениями), используя простой проект потока Spring Cloud. У меня возникли трудности с этим с помощью Spring, но я смог сделать это с помощью простой Java, поэтому проблема не должна быть связана с Oracle.
Я получаю следующую ошибку:< /p>

Код: Выделить всё

oracle.jakarta.jms.AQjmsException: JMS-137: Payload factory must be specified for destinations with ADT payloads
at oracle.jakarta.jms.AQjmsError.throwEx(AQjmsError.java:317) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsConsumer.(AQjmsConsumer.java:482) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsConsumer.(AQjmsConsumer.java:338) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsSession.createConsumer(AQjmsSession.java:8974) ~[aqapi-jakarta-23.3.1.0.jar:na]
at oracle.jakarta.jms.AQjmsSession.createConsumer(AQjmsSession.java:8863) ~[aqapi-jakarta-23.3.1.0.jar:na]
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:930) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:225) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1290) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1256) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1247) ~[spring-jms-6.1.13.jar:6.1.13]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1140) ~[spring-jms-6.1.13.jar:6.1.13]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
(я поместил logging.level.org.springframework.jms=TRACE в application.properties)
Имя очереди — container_queue, пользователь базы данных — jmsuser, а имя типа объекта базы данных — message_containerВот проект Spring-cloud-stream:
build.gradle

Код: Выделить всё

implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.boot:spring-boot-starter-activemq'
implementation 'jakarta.jms:jakarta.jms-api:3.0.0'
implementation 'com.oracle.database.messaging:aqapi-jakarta:23.3.1.0'
implementation 'com.oracle.database.jdbc:ojdbc11:23.3.0.23.09'
implementation 'com.oracle.database.jdbc:ucp:23.3.0.23.09'
MessageContainer.java

Код: Выделить всё

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Struct;
import java.sql.Timestamp;

import oracle.jdbc.OracleTypes;
import oracle.sql.Datum;
import oracle.sql.ORAData;
import oracle.sql.ORADataFactory;

public class MessageContainer implements ORAData, ORADataFactory {
String primaryId;
String businessUnitCode;
Timestamp scanTimestamp;
public static final String SQL_NAME = "jmsuser.MESSAGE_CONTAINER";
public static final int SQL_TYPECODE = OracleTypes.STRUCT;

protected static final MessageContainer MESSAGECONTAINERFactory = new MessageContainer();

public MessageContainer() {
}

public MessageContainer(String primaryId, String businessUnitCode, Timestamp scanTimestamp) {
this.primaryId = primaryId;
this.businessUnitCode = businessUnitCode;
this.scanTimestamp = scanTimestamp;
}

public static ORADataFactory getORADataFactory() {
return MESSAGECONTAINERFactory;
}

public ORAData create(Datum d, int sqlType) throws SQLException {
if (d == null) {
return null;
}
if (!(d instanceof Struct)) {
throw new SQLException("Expected Struct type Datum but found "  + d.getClass());
}
Struct struct = (Struct) d;
Object[] attr = struct.getAttributes();
return new MessageContainer((String) attr[0], (String) attr[1], (Timestamp) attr[2]);
}

@Override
public Datum toDatum(Connection c) throws SQLException {
Object[] attributes = { primaryId, businessUnitCode, scanTimestamp };
Struct struct = c.createStruct(SQL_NAME, attributes);
return (Datum) struct;
}

public String getPrimaryId() {
return primaryId;
}

public void setPrimaryId(String primaryId) {
this.primaryId = primaryId;
}

public String getBusinessUnitCode() {
return businessUnitCode;
}

public void setBusinessUnitCode(String businessUnitCode) {
this.businessUnitCode = businessUnitCode;
}

public Timestamp getScanTimestamp() {
return scanTimestamp;
}

public void setScanTimestamp(Timestamp scanTimestamp) {
this.scanTimestamp = scanTimestamp;
}
}
OracleAQListener.java

Код: Выделить всё

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import jakarta.jms.Message;
import oracle.jakarta.jms.AQjmsAdtMessage;

@Component
public class OracleAQListener {
@JmsListener(destination = "container_queue", containerFactory = "jmsListenerContainerFactory")
public void receiveMessage(Message message) {
try {
if (message instanceof AQjmsAdtMessage adtMessage) {
MessageContainer msg = (MessageContainer) adtMessage.getAdtPayload();
System.out.println("Received a message! ID: " + msg.getPrimaryId() + " BU: " + msg.getBusinessUnitCode() + " date: " + msg.getScanTimestamp());

} else {
System.out.println("Received unknown message type: " + message.getClass().getName());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
OracleAQConfig.java (скорее всего здесь нужно изменить, но не знаю как)

Код: Выделить всё

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import oracle.jakarta.jms.AQjmsFactory;
import oracle.jdbc.pool.OracleDataSource;

@Configuration
public class OracleAQConfig {

@Bean
public ConnectionFactory connectionFactory() throws JMSException, SQLException {
Properties props = new Properties();
props.setProperty("oracle.jms.mapMessage", "true");

OracleDataSource dataSource = new OracleDataSource();
dataSource.setURL("jdbc:oracle:thin:@//dbaddress.com:1521/MYSID");
dataSource.setUser("jmsuser");
dataSource.setPassword("jmspassword");
dataSource.setConnectionProperties(props);

Connection conn = null;
try {
conn = dataSource.getConnection();
Map

Подробнее здесь: [url]https://stackoverflow.com/questions/79019411/read-messages-from-oracle-aq-with-custom-adt-payload[/url]
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»