Amazon MQ как ретранслятор брокера для Spring + Websocket + Stomp не работаетJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Amazon MQ как ретранслятор брокера для Spring + Websocket + Stomp не работает

Сообщение Anonymous »

Я не могу подключиться к Amazon MQ (Active MQ), который я настраиваю в качестве ретранслятора брокера для соединений веб-сокетов по протоколу STOMP.
Ниже приведены некоторые из моих попыток настройки
Среда:
Версия Spring Boot: 2.7.17
Сборка.gradle

Код: Выделить всё

plugins {
id 'org.springframework.boot' version '2.7.17'
id 'io.spring.dependency-management' version '1.0.11.RELEASE'
id 'java'
id 'maven-publish'
}
repositories {
mavenLocal()
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.webjars:stomp-websocket:2.3.3'
implementation 'org.springframework.boot:spring-boot-starter-amqp:2.7.9'
implementation 'io.netty:netty-all:4.1.59.Final'
implementation 'io.projectreactor.netty:reactor-netty:1.0.12'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
group = 'com.dailycodebuffer'
version = '0.0.1-SNAPSHOT'
description = 'spring-websocket'
bootJar {
mainClassName = 'com.xxx.SpringBootApp'
}
publishing {
publications {
maven(MavenPublication) {
from(components.java)
}
}
}
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8'
}
tasks.withType(Javadoc) {
options.encoding = 'UTF-8'
}

Настройка Amazon MQ
OpenWire -> ssl://abc.amazonaws.com:61617

STOMP -> stomp+ssl://abc.amazonaws.com:61614
application.properties
relay.host= my.amazonaws.com
relay.port= 61617
relay.host.user= myuserrelay.host.password= мой пароль
Моя первая попытка настройки Websocket приведена ниже

Код: Выделить всё

    @Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Value("${relay.host}")
private String host;
@Value("${relay.port}")
private int port;
@Value("${relay.host.user}")
private String user;
@Value("${relay.host.password}")
private String password;

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(1);
threadPoolTaskScheduler.initialize();
threadPoolTaskScheduler.setThreadNamePrefix("wss-heartbeat-thread-");

final ReactorNettyTcpClient client = new ReactorNettyTcpClient(tcpClient -> tcpClient
.host(host)
.port(port)
.secure(), new StompReactorNettyCodec());

registry.enableStompBrokerRelay("/queue/", "/topic/")
.setTcpClient(client)
.setAutoStartup(true)
.setClientLogin(user)
.setClientPasscode(password)
.setSystemLogin(user)
.setSystemPasscode(password)
.setTaskScheduler(threadPoolTaskScheduler);

registry.setApplicationDestinationPrefixes("/app");
registry.setPreservePublishOrder(true);
}

@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
final RequestUpgradeStrategy upgradeStrategy = new TomcatRequestUpgradeStrategy();
registry.addEndpoint("/chat")
.setHandshakeHandler(new DefaultHandshakeHandler(upgradeStrategy))
.setAllowedOrigins("*");
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new AuthChannelInterceptor());
}

}
Ошибка

Код: Выделить всё

2024-05-09 19:54:17.221  INFO 19660 --- [ent-scheduler-6] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session _system_: Transport failure: org.springframework.messaging.simp.stomp.StompConversionException: '\r' must be followed by '\n'
Caused by: org.springframework.messaging.simp.stomp.StompConversionException: '\r' must be followed by '\n'
at org.springframework.messaging.simp.stomp.StompDecoder.tryConsumeEndOfLine(StompDecoder.java:357) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.simp.stomp.StompDecoder.readCommand(StompDecoder.java:217) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.simp.stomp.StompDecoder.decodeMessage(StompDecoder.java:143) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.simp.stomp.StompDecoder.decode(StompDecoder.java:115) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.simp.stomp.StompDecoder.decode(StompDecoder.java:88) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.simp.stomp.StompReactorNettyCodec.decodeInternal(StompReactorNettyCodec.java:54) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.tcp.reactor.AbstractNioBufferReactorNettyCodec.decode(AbstractNioBufferReactorNettyCodec.java:41) ~[spring-messaging-5.3.14.jar:5.3.14]
at org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient$StompMessageDecoder.decode(ReactorNettyTcpClient.java:348) ~[spring-messaging-5.3.14.jar:5.3.14]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:508) ~[netty-all-4.1.59.Final.jar:4.1.59.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:447) ~[netty-all-4.1.59.Final.jar:4.1.59.Final]

Примечание. Мы также попробовали использовать порт STOMP 61614, что привело к следующей ошибке.

Код: Выделить всё

2024-05-09 20:08:36.531  INFO 23700 --- [ient-loop-nio-3] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session _system_: Failed to connect: handshake timed out after 10000ms

io.netty.handler.ssl.SslHandshakeTimeoutException: handshake timed out after 10000ms
Моя вторая попытка настройки Websocket приведена ниже

Код: Выделить всё

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Value("${relay.host}")
private String host;
@Value("${relay.port}")
private int port;
@Value("${relay.host.user}")
private String user;
@Value("${relay.host.password}")
private String password;

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(1);
threadPoolTaskScheduler.initialize();
threadPoolTaskScheduler.setThreadNamePrefix("wss-heartbeat-thread-");

final ReactorNettyTcpClient client = new ReactorNettyTcpClient(tcpClient -> tcpClient
.host(host)
.port(61617)//Open-wire SSL port
.secure(), new StompReactorNettyCodec());

registry.enableStompBrokerRelay("/queue/", "/topic/")
.setTcpClient(client)
.setAutoStartup(true)
.setRelayHost(host)
.setRelayPort(61614) //STOMP port
.setClientLogin(user)
.setClientPasscode(password)
.setSystemLogin(user)
.setSystemPasscode(password)
.setTaskScheduler(threadPoolTaskScheduler);

registry.setApplicationDestinationPrefixes("/app");
registry.setPreservePublishOrder(true);
}

@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
final RequestUpgradeStrategy upgradeStrategy = new TomcatRequestUpgradeStrategy();
registry.addEndpoint("/chat")
.setHandshakeHandler(new DefaultHandshakeHandler(upgradeStrategy))
.setAllowedOrigins("*");
}
Это также не работает с

Код: Выделить всё

2024-05-09 20:14:11.863  INFO 1724 --- [ent-scheduler-8] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session _system_: Transport failure: org.springframework.messaging.simp.stomp.StompConversionException: '\r' must be followed by '\n'

io.netty.handler.codec.DecoderException: org.springframework.messaging.simp.stomp.StompConversionException: '\r' must be followed by '\n'
Моя третья попытка настройки Websocket приведена ниже

Код: Выделить всё

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Value("${relay.host}")
private String host;
@Value("${relay.port}")
private int port;
@Value("${relay.host.user}")
private String user;
@Value("${relay.host.password}")
private String password;

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
final ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(1);
threadPoolTaskScheduler.initialize();
threadPoolTaskScheduler.setThreadNamePrefix("wss-heartbeat-thread-");

registry.enableStompBrokerRelay("/queue/", "/topic/")
.setAutoStartup(true)
.setRelayHost(host)
.setRelayPort(port) //Open-wire SSL port
.setClientLogin(user)
.setClientPasscode(password)
.setSystemLogin(user)
.setSystemPasscode(password)
.setTaskScheduler(threadPoolTaskScheduler);

registry.setApplicationDestinationPrefixes("/app");
registry.setPreservePublishOrder(true);
}

@Override
public void registerStompEndpoints(final StompEndpointRegistry registry) {
final RequestUpgradeStrategy upgradeStrategy = new TomcatRequestUpgradeStrategy();
registry.addEndpoint("/chat")
.setHandshakeHandler(new DefaultHandshakeHandler(upgradeStrategy))
.setAllowedOrigins("*");
}

@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new AuthChannelInterceptor());
}

}
В этом случае сервер запускается успешно без каких-либо сбоев соединения, но при публикации сообщения брокеру я получаю следующую ошибку Брокер сообщений не активен.

Код: Выделить всё

2024-05-09 20:30:53.778 ERROR 30512 --- [nio-8083-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessageDeliveryException: Message broker not active. Consider subscribing to receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.] with root cause

org.springframework.messaging.MessageDeliveryException: Message broker not active. Consider subscribing to receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.
Он работает локально с использованием ActiveMQ с открытым исходным кодом, не работает только AWS MQ.
Любая помощь эта работа была бы очень признательна.

Подробнее здесь: https://stackoverflow.com/questions/784 ... ot-working
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»