Рабочая настройка
Создание очередей
Код: Выделить всё
begin
dbms_aqadm.create_queue_table(
queue_table => 'poc_queue_source.nps_transactions_qt',
queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE',
multiple_consumers => FALSE
);
dbms_aqadm.create_queue(
queue_name => 'poc_queue_source.nps_transactions_queue',
queue_table => 'poc_queue_source.nps_transactions_qt'
);
dbms_aqadm.start_queue(
queue_name => 'poc_queue_source.nps_transactions_queue'
);
end;
/
Код: Выделить всё
declare
row_json varchar2(4000);
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message SYS.AQ$_JMS_TEXT_MESSAGE := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
begin
row_json := json_object(
'transactionId' value :NEW.transactionid,
'srcaccountid' value :NEW.srcaccountid,
'dstaccountid' value :NEW.dstaccountid
-- More stuff here
null on null
);
message.set_text(row_json);
dbms_aq.enqueue(
queue_name => 'poc_queue_source.nps_transactions_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
end;
Код: Выделить всё
@Bean
public DefaultMessageListenerContainer jmsListenerContainer(
DefaultJmsListenerContainerFactory jmsListenerContainerFactory,
JmsMessageListenerImpl messageListener,
@Value("${application.jms.listener.queue-name}") String queueName,
@Value("${application.jms.listener.idle-receives-per-task-limit}") Integer idleReceivesLimit
) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setMessageListener(messageListener);
endpoint.setDestination(queueName);
DefaultMessageListenerContainer container = jmsListenerContainerFactory.createListenerContainer(endpoint);
// Setting idleReceivesPerTaskLimit makes the Listener scale down concurrency when the queue is idle.
container.setIdleReceivesPerTaskLimit(idleReceivesLimit);
return container;
}
Использование библиотеки:
Код: Выделить всё
com.oracle.database.messaging
aqapi-jakarta
23.2.1.0
Не удалось настроить
Но если я перейду на сегментированную очередь, он больше не получает их.
Настройка сегментированной очереди:
Код: Выделить всё
begin
dbms_aqadm.create_sharded_queue(
queue_name => 'poc_queue_source.nps_transactions_shdqueue',
multiple_consumers => FALSE,
queue_payload_type => DBMS_AQADM.JMS_TYPE
);
dbms_aqadm.start_queue(
queue_name => 'poc_queue_source.nps_transactions_shdqueue'
);
end;
/
Код: Выделить всё
declare
row_json varchar2(4000);
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message SYS.AQ$_JMS_TEXT_MESSAGE := SYS.AQ$_JMS_TEXT_MESSAGE.construct;
begin
row_json := json_object(
'transactionId' value :NEW.transactionid,
'srcaccountid' value :NEW.srcaccountid,
'dstaccountid' value :NEW.dstaccountid
-- More stuff here
null on null
);
message.set_text(row_json);
dbms_aq.enqueue(
queue_name => 'poc_queue_source.nps_transactions_queue_shdqueue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle
);
end;
Я вижу, что сообщения появляются в таблице очереди в базе данных, но Вызов MessageConsumer.receive внутри org.springframework.jms.support.destination.JmsDestinationAccessor#receiveFromConsumer не получает сообщений.
Что-то не так с моей конфигурацией и настройкой? Я вижу, что структура таблицы очередей между не-сегментированными и сегментированными очередями различна.
Подробнее здесь: https://stackoverflow.com/questions/793 ... rded-queue