Привязка адреса с очередью не работает для встроенной Activemq ArtemisJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Привязка адреса с очередью не работает для встроенной Activemq Artemis

Сообщение Anonymous »

Я пытаюсь выполнить модульное тестирование моего кода производителя JMS в приложении Spring Boot на встроенном ActiveMQ Artemis.
Ниже приведен мой код для запуска встроенного брокера с тремя очередями test.queue.0, test.queue.1 и test.queue.2
Configuration embeddedActiveMqServerConfig = new ConfigurationImpl();
embeddedActiveMqServerConfig
//.addAcceptorConfiguration("in-vm", "vm://0")
.addAcceptorConfiguration("tcp", "tcp://127.0.0.1:61616")
.setResolveProtocols(false)
.addQueueConfiguration((new QueueConfiguration("test.queue.0")).setAddress("test.queue.0").setRoutingType(RoutingType.ANYCAST).setEnabled(true))
.addQueueConfiguration((new QueueConfiguration("test.queue.1")).setAddress("test.queue.1").setRoutingType(RoutingType.ANYCAST).setEnabled(true))
.addQueueConfiguration((new QueueConfiguration("test.queue.2")).setAddress("test.queue.2").setRoutingType(RoutingType.ANYCAST).setEnabled(true))
.setSecurityEnabled(false);
embeddedActiveMqServer = new EmbeddedActiveMQExtension(embeddedActiveMqServerConfig);
embeddedActiveMqServer.start();

Код производителя работает нормально, и я могу создавать новые сообщения в test.queue.0 и получать взамен jmsId, но я получаю следующий оператор отладки в консоль для созданного сообщения
[Thread-9 (ActiveMQ-server-org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl$6@4a901445)] DEBUG o.a.a.a.c.p.impl.PostOfficeImpl - Message CoreMessage[messageID=60129543556,durable=true,userID=9e9c9f71-da53-11ef-83f3-0205857feb80,priority=4, timestamp=Fri Jan 24 14:03:34 CET 2025,expiration=0, durable=true, address=test.queue.0,size=415,properties=TypedProperties[__AMQ_CID=9cba23cc-da53-11ef-83f3-0205857feb80,uber_$dash$_trace_$dash$_id=efcf8ad9a21d1b0b:fabdfb38a47041e6:efcf8ad9a21d1b0b:0,_AMQ_ROUTING_TYPE=1]]@1341335240 is not going anywhere as it didn't have a binding on address:test.queue.0

В соответствии с этим сообщением привязка адреса отсутствует, так как в моем созданном сообщении, отправленном на адрес test.queue.0, не попадает в очередь назначения test.queue.0 , поскольку между ними нет связи, но она есть в моей конфигурации.
Может кто-нибудь помочь мне понять, что я делаю не так с кодом конфигурации?< /p>
ОБНОВЛЕНИЕ
config-unit.yaml — они сопоставлены со свойствами конфигурации
mq:
brokerUrl:
- 127.0.0.1:61616
httpEnabled: false
sslEnabled: false
< /code>

Все конфигурации Beans < /h2>
@Configuration
@EnableConfigurationProperties(ApplicationProperties.class)
@EnableJms
@EnableScheduling
public class ApplicationConfig {

private static final Logger logger = LoggerFactory.getLogger(ApplicationConfig.class);
private final ApplicationProperties applicationProperties;

public ApplicationConfig(ApplicationProperties applicationProperties) {
this.applicationProperties = applicationProperties;
}

public ConnectionFactory jmsConnectionFactory() {
TransportConfiguration[] jmsTransportConfigurations = Optional.of(applicationProperties.getBrokerUrl())
.orElse(Collections.emptyList())
.stream()
.map(brokerUrl -> {
String[] splitHostAndPort = brokerUrl.split(":");
return new TransportConfiguration(NettyConnectorFactory.class.getName(), Map.of(
TransportConstants.HOST_PROP_NAME, splitHostAndPort[0],
TransportConstants.PORT_PROP_NAME, splitHostAndPort[1],
TransportConstants.HTTP_ENABLED_PROP_NAME, applicationProperties.isHttpEnabled(),
TransportConstants.SSL_ENABLED_PROP_NAME, applicationProperties.isSslEnabled()
));
}).toArray(TransportConfiguration[]::new);

return ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, jmsTransportConfigurations);
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(jmsConnectionFactory());
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionFactory;
}

@Bean
public DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(cachingConnectionFactory);
factory.setConcurrency("5");
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
// factory.setErrorHandler(error -> logger.error("Error in listener: {}", error.getMessage(), error));
factory.setSessionTransacted(true);
return factory;
}

@Bean
public JmsListenerEndpointRegistry jmsListenerEndpointRegistry() {
return new JmsListenerEndpointRegistry();
}

@Bean
public JmsListenerEndpointRegistrar jmsListenerEndpointRegistrar(DefaultJmsListenerContainerFactory defaultJmsListenerContainerFactory,
JmsListenerEndpointRegistry jmsListenerEndpointRegistry) {
JmsListenerEndpointRegistrar registrar = new JmsListenerEndpointRegistrar();
registrar.setEndpointRegistry(jmsListenerEndpointRegistry);
registrar.setContainerFactory(defaultJmsListenerContainerFactory);
return registrar;
}

@Bean
RestTemplate RestTemplate(@Qualifier("outboundAuthInterceptor") ClientHttpRequestInterceptor authInterceptor) {
var httpClient = HttpClientBuilder.create().build();
RestTemplate restTemplate = new RestTemplate();
restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory(httpClient));
restTemplate.getInterceptors().add(authInterceptor);
return restTemplate;
}

}


Производитель встроен в контроллер покоя
@RestController
@RequestMapping("/api/v1/mq")
public class MqController {
private final JmsTemplate jmsTemplate;

public MqController(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}

@PostMapping("/{queue}/send")
public ResponseEntity post(@PathVariable("queue") String queue,
@RequestBody String message,
@RequestParam(name = "group_id", required = false) String groupId) {
String messageId = jmsTemplate.execute(session -> {
Message jmsMessage = session.createTextMessage(message);
if (StringUtils.isNotBlank(groupId)) {
jmsMessage.setStringProperty("JMSXGroupID", groupId);
}
javax.jms.Queue queueDestination = session.createQueue(queue);
session.createProducer(queueDestination).send(jmsMessage);
return jmsMessage.getJMSMessageID();
});

return ResponseEntity.status(HttpStatus.CREATED).body(new MessageCreatedResponse(messageId, message, new Date()));

}
}


это регистрация прослушивателя, которая никогда не вызывается, поскольку сообщение никогда не попадает в очередь
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId(listener.getId().toString());
endpoint.setDestination(listener.getQueue());
endpoint.setMessageListener(message -> {
try {
restTemplate.postForObject(listener.getCallbackUrl(), message.getBody(String.class), Void.class);
message.acknowledge();
logger.debug("message {} acknowledged", message.getJMSMessageID());
} catch (JMSException e) {
logger.error("Could not retrieve message contents or failed in acknowledgement", e);
throw new RuntimeException("Could not retrieve message contents or failed in acknowledgement", e);
} catch (RestClientResponseException e) {
logger.error("Callback to {} failed with statusCode {}", listener.getCallbackUrl(), e.getRawStatusCode(), e);
throw new RuntimeException("Callback to " + listener.getCallbackUrl() + " failed with statusCode " + e.getRawStatusCode(), e);
}
});
jmsListenerEndpointRegistrar.registerEndpoint(endpoint);
jmsListenerEndpointRegistrar.afterPropertiesSet();
activeListenerIds.add(listener.getId());


Подробнее здесь: https://stackoverflow.com/questions/793 ... mq-artemis
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»