Каждое сообщение содержит поле, с помощью которого мы можем понять, что они связаны друг с другом. И цель состоит в том, чтобы порядок, в котором сообщения помещаются в очередь, сохранялся внутри группы, тогда как сообщения из разных групп могли обрабатываться параллельно.
Так, например. у нас в очереди 5 сообщений: A1-A2-A3-A4-A5. A1 и A3 связаны друг с другом, поэтому мы должны гарантировать, что сообщение A1 будет обработано раньше A3 (не только прослушано, но и обработано). Итак, чтобы сохранить порядок внутри группы для одного экземпляра приложения, я использую селектор:
Producer:
Код: Выделить всё
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
private final JmsTemplate jmsTemplate;
public MessageProducer(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void sendMessage(RequestDto requestDto) {
jmsTemplate.convertAndSend(
"test_queue",
requestDto,
message -> {
message.setStringProperty("requestDtoGroup", requestDto.getGroup());
return message;
}
);
}
}
Код: Выделить всё
import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
@Log4j2
public class MessageConsumer {
@JmsListener(
destination = "test_queue",
selector = "requestDtoGroup = 'group_1'"
)
public void listenGroup1(Message message) {
log.info("Message with group_1" + message);
// further processing
}
@JmsListener(
destination = "test_queue",
selector = "requestDtoGroup = 'group_2'"
)
public void listenGroup2(Message message) {
log.info("Message with group_2" + message);
// further processing
}
}
Код: Выделить всё
import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import java.time.Duration;
import java.util.Properties;
@Configuration
@EnableJms
public class JmsConfiguration {
@Bean
public ConnectionFactory connectionFactory() throws JMSException {
Properties properties = new Properties();
properties.setProperty("user", "AQ_USER");
properties.setProperty("password", "your_password");
return AQjmsFactory.getQueueConnectionFactory(
"jdbc:oracle:thin:@localhost:1521/ORCLPDB1",
properties
);
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SomeHandler someHandler) throws JMSException {
var containerFactory = new DefaultJmsListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory());
containerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
containerFactory.setSessionTransacted(true);
return containerFactory;
}
}
Но я не могу этого добиться, если 2 или запущено больше экземпляров приложения.
Если я параллельно запускаю другой экземпляр приложения (а именно, я запускаю другое приложение локально с копией- вставил класс MessageConsumer), то сообщения с одним и тем же requestDtoGroup (A1 и A3) обрабатываются параллельно, чего я хочу избежать.
Как добиться, чтобы сообщения с одним и тем же requestDtoGroup обрабатывались последовательно даже при наличии двух и более экземпляров приложения?
Что Я пробовал до сих пор:
- В некоторых ответах на SO я наткнулся на совет посмотреть на группировку сообщений, но, насколько я понял, это не так. Меня это не устраивает, поскольку сообщения Oracle AQ группируются в рамках одной транзакции, а мои сообщения группируются логически.
- Но все же я попробовал установить преобразуйте значение requestDtoGroup в идентификатор корреляции или JMSXGroupID, отправив
сообщение через jmsTemplate.convertAndSend, но это ни на что не повлияло. - Кроме того, я нашел этот вопрос в SO, где рекомендуется использовать параметры удаления из очереди для поиска путем исключения самого старого времени постановки в очередь для сообщения, но я не не знаю, можно ли это как-то настроить для слушателей.
Подробнее здесь: https://stackoverflow.com/questions/790 ... ltiple-con