Код: Выделить всё
import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
@Component
@Log4j2
public class SomeJmsListener {
@JmsListener(
id = "id_listener_1",
destination = "wmstk_queue",
containerFactory = "jmsListenerContainerFactory"
)
public void listenMessage(Message message) throws Exception {
if (message instanceof ObjectMessage objectMessage) {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
WebClient.create()
.post()
.uri("some_url")
.bodyValue(messageInfo)
.retrieve()
.toEntity(String.class)
.block();
}
}
}
Код: Выделить всё
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Service;
@Service
@Log4j2
@RequiredArgsConstructor
public class JmsListenerControlService {
private final JmsListenerEndpointRegistry registry;
public void stop(String listenerId) {
var container = registry.getListenerContainer(listenerId);
if (container.isRunning()) {
container.stop();
}
}
}
Единственное решение, которое я нашел сейчас, — это наблюдать за контейнером и вручную прерывать поток ListenMessage:
Код: Выделить всё
import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Component
@Log4j2
public class SomeJmsListener {
@Autowired
private JmsListenerEndpointRegistry registry;
private ScheduledExecutorService scheduledExecutorService;
@JmsListener(
id = "id_listener_1",
destination = "wmstk_queue",
containerFactory = "jmsListenerContainerFactory"
)
public void listenMessage(Message message) throws Exception {
observeContainer(Thread.currentThread());
if (message instanceof ObjectMessage objectMessage) {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();
WebClient.create()
.post()
.uri("some_url")
.bodyValue(messageInfo)
.retrieve()
.toEntity(String.class)
.block();
}
stopObservation();
}
/**
* Scheduled job observes jms listener and if it is not running - interrupts its thread
*/
private void observeContainer (Thread thread){
String listenerId = "id_listener_1";
var listenerContainer = registry.getListenerContainer(listenerId);
log.debug("Start observation for listener container with id: {}", listenerId);
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(
() -> {
if (!listenerContainer.isRunning()) {
stopObservation();
thread.interrupt(); // interrupt the thread to prevent further execution of listenMessage
} else {
log.debug("Listener active, id: {}", listenerId);
}
},
10, // 10ms initial delay
100, // execute each 100ms
TimeUnit.MILLISECONDS
);
}
private void stopObservation () {
log.info("Stop observation for listener container with id: {}", "wmstk-listener-endpoint-1");
if (scheduledExecutorService != null) {
scheduledExecutorService.close();
scheduledExecutorService = null;
}
}
}
org.springframework.jms.listener.adapter.ListenerExecutionFailedException:
Метод прослушивателя ' ListenMessage выдает java.lang.Exception'
ОШИБКА [er-endpoint-1-1] o.s.j.l.DefaultMessageListenerContainer :
Application исключение переопределено ошибкой отката
Вызвано: java.io.InterruptedIOException:
Чтение сокета прервано:
oracle.jakarta.jms.AQjmsException: Ошибка ввода-вывода: чтение сокета прервано
Итак, это исключение возникает при откате , а именно:
Код: Выделить всё
package oracle.jakarta.jms;
class AQjmsSession {
synchronized void forceRollback() throws JMSException {
Connection db_conn = null;
db_conn = this.getDBConnection();
try {
db_conn.rollback(); // here exception occurs
this.setConsistency(true);
} catch (SQLException var3) {
throw new AQjmsException(var3);
}
this.restartConsumers();
}
}
А может быть есть какое-то другой способ плавно сделать откат и прервать выполнение метода?
Подробнее здесь: https://stackoverflow.com/questions/792 ... s-listener
Мобильная версия