Последовательная обработка логически сгруппированных сообщений в Oracle AQ несколькими потребителями.JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Последовательная обработка логически сгруппированных сообщений в Oracle AQ несколькими потребителями.

Сообщение Anonymous »

У меня есть служба весенней загрузки, использующая Oracle AQ. Служба принимает http-запросы, ставит тела запросов в виде сообщений в Oracle AQ, затем удаляет их из очереди и отправляет службам назначения через http, поэтому очередь здесь похожа на буфер.
Каждое сообщение содержит поле, с помощью которого мы можем понять, что они связаны друг с другом. И цель состоит в том, чтобы порядок, в котором сообщения помещаются в очередь, сохранялся внутри группы, тогда как сообщения из разных групп могли обрабатываться параллельно.
Так, например. у нас в очереди 5 сообщений: A1-A2-A3-A4-A5. A1 и A3 связаны друг с другом, поэтому мы должны гарантировать, что сообщение A1 будет обработано раньше A3 (не только прослушано, но и обработано). Итак, чтобы сохранить порядок внутри группы для одного экземпляра приложения, я использую селектор:
Producer:

Код: Выделить всё

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

private final JmsTemplate jmsTemplate;

public MessageProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

public void sendMessage(RequestDto requestDto) {
jmsTemplate.convertAndSend(
"test_queue",
requestDto,
message -> {
message.setStringProperty("requestDtoGroup", requestDto.getGroup());
return message;
}
);
}
}
Потребитель:

Код: Выделить всё

import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
@Log4j2
public class MessageConsumer {

@JmsListener(
destination = "test_queue",
selector = "requestDtoGroup = 'group_1'"
)
public void listenGroup1(Message message) {
log.info("Message with group_1" + message);
// further processing
}

@JmsListener(
destination = "test_queue",
selector = "requestDtoGroup = 'group_2'"
)
public void listenGroup2(Message message) {
log.info("Message with group_2" + message);
// further processing
}
}

А это моя конфигурация для JMS:

Код: Выделить всё

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

import java.time.Duration;
import java.util.Properties;

@Configuration
@EnableJms
public class JmsConfiguration {

@Bean
public ConnectionFactory connectionFactory() throws JMSException {
Properties properties = new Properties();
properties.setProperty("user", "AQ_USER");
properties.setProperty("password", "your_password");

return AQjmsFactory.getQueueConnectionFactory(
"jdbc:oracle:thin:@localhost:1521/ORCLPDB1",
properties
);
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SomeHandler someHandler) throws JMSException {
var containerFactory = new DefaultJmsListenerContainerFactory();

containerFactory.setConnectionFactory(connectionFactory());

containerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
containerFactory.setSessionTransacted(true);

return containerFactory;
}
}
Для разных селекторов сообщения обрабатываются параллельно и работают должным образом.
Но я не могу этого добиться, если 2 или запущено больше экземпляров приложения.
Если я параллельно запускаю другой экземпляр приложения (а именно, я запускаю другое приложение локально с копией- вставил класс MessageConsumer), то сообщения с одним и тем же requestDtoGroup (A1 и A3) обрабатываются параллельно, чего я хочу избежать.
Как добиться, чтобы сообщения с одним и тем же requestDtoGroup обрабатывались последовательно даже при наличии двух и более экземпляров приложения?
Что Я пробовал до сих пор:
  • В некоторых ответах на SO я наткнулся на совет посмотреть на группировку сообщений, но, насколько я понял, это не так. Меня это не устраивает, поскольку сообщения Oracle AQ группируются в рамках одной транзакции, а мои сообщения группируются логически.
  • Но все же я попробовал установить преобразуйте значение requestDtoGroup в идентификатор корреляции или JMSXGroupID, отправив
    сообщение через jmsTemplate.convertAndSend, но это ни на что не повлияло.
  • Кроме того, я нашел этот вопрос в SO, где рекомендуется использовать параметры удаления из очереди для поиска путем исключения самого старого времени постановки в очередь для сообщения, но я не не знаю, можно ли это как-то настроить для слушателей.
На самом деле я добился желаемого поведения с помощью Kafka, где я использую отдельные разделы для группировки сообщений внутри темы, но мне приходится использовать Oracle AQ. Наверное имеет смысл посмотреть Oracle Transactional Event Queues, говорят, что это похоже на Kafka, но я бы попробовал сначала сделать это с помощью простого AQ.

Подробнее здесь: https://stackoverflow.com/questions/790 ... ltiple-con
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»