Как немедленно остановить выполнение Spring JmsListener?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как немедленно остановить выполнение Spring JmsListener?

Сообщение Anonymous »

В моем приложении Java Spring Boot 3 у меня есть простой JmsListener для Oracle AQ:

Код: Выделить всё

import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

@Component
@Log4j2
public class SomeJmsListener {

@JmsListener(
id = "id_listener_1",
destination = "wmstk_queue",
containerFactory = "jmsListenerContainerFactory"
)
public void listenMessage(Message message) throws Exception {
if (message instanceof ObjectMessage objectMessage) {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();

WebClient.create()
.post()
.uri("some_url")
.bodyValue(messageInfo)
.retrieve()
.toEntity(String.class)
.block();
}
}
}
Этот прослушиватель можно остановить в какой-то момент выполнения приложения с помощью JmsListenerEndpointRegistry следующим образом:

Код: Выделить всё

import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Service;

@Service
@Log4j2
@RequiredArgsConstructor
public class JmsListenerControlService {

private final JmsListenerEndpointRegistry registry;

public void stop(String listenerId) {
var container = registry.getListenerContainer(listenerId);

if (container.isRunning()) {
container.stop();
}
}
}
Цель состоит в том, чтобы, если контейнер прослушивателя остановлен, выполнение ListenMessage должно быть немедленно остановлено (если оно запущено) и JMS Сеанс необходимо откатить.
Единственное решение, которое я нашел сейчас, — это наблюдать за контейнером и вручную прерывать поток ListenMessage:

Код: Выделить всё

import com.example.webfluxexample.model.MessageInfo;
import jakarta.jms.Message;
import jakarta.jms.ObjectMessage;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
@Log4j2
public class SomeJmsListener {

@Autowired
private JmsListenerEndpointRegistry registry;

private ScheduledExecutorService scheduledExecutorService;

@JmsListener(
id = "id_listener_1",
destination = "wmstk_queue",
containerFactory = "jmsListenerContainerFactory"
)
public void listenMessage(Message message) throws Exception {
observeContainer(Thread.currentThread());

if (message instanceof ObjectMessage objectMessage) {
MessageInfo messageInfo = (MessageInfo) objectMessage.getObject();

WebClient.create()
.post()
.uri("some_url")
.bodyValue(messageInfo)
.retrieve()
.toEntity(String.class)
.block();
}
stopObservation();
}

/**
* Scheduled job observes jms listener and if it is not running - interrupts its thread
*/
private void observeContainer (Thread thread){
String listenerId = "id_listener_1";
var listenerContainer = registry.getListenerContainer(listenerId);

log.debug("Start observation for listener container with id: {}", listenerId);

scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

scheduledExecutorService.scheduleAtFixedRate(
() ->  {
if (!listenerContainer.isRunning()) {
stopObservation();
thread.interrupt(); // interrupt the thread to prevent further execution of listenMessage
} else {
log.debug("Listener active, id: {}", listenerId);
}
},
10, // 10ms initial delay
100, // execute each 100ms
TimeUnit.MILLISECONDS
);
}

private void stopObservation () {
log.info("Stop observation for listener container with id: {}", "wmstk-listener-endpoint-1");
if (scheduledExecutorService != null) {
scheduledExecutorService.close();
scheduledExecutorService = null;
}
}
}
После прерывания сообщение остаётся в очереди и обработка прерывается как положено, но в самом конце логов я вижу это исключение и не уверен, стоит ли по этому поводу беспокоиться или нет:

org.springframework.jms.listener.adapter.ListenerExecutionFailedException:
Метод прослушивателя ' ListenMessage выдает java.lang.Exception'


ОШИБКА [er-endpoint-1-1] o.s.j.l.DefaultMessageListenerContainer :
Application исключение переопределено ошибкой отката


Вызвано: java.io.InterruptedIOException:
Чтение сокета прервано:
oracle.jakarta.jms.AQjmsException: Ошибка ввода-вывода: чтение сокета прервано

Итак, это исключение возникает при откате , а именно:

Код: Выделить всё

package oracle.jakarta.jms;

class AQjmsSession {

synchronized void forceRollback() throws JMSException {
Connection db_conn = null;
db_conn = this.getDBConnection();

try {
db_conn.rollback(); // here exception occurs
this.setConsistency(true);
} catch (SQLException var3) {
throw new AQjmsException(var3);
}

this.restartConsumers();
}
}

Стоит ли беспокоиться об этом исключении, если сообщение остается в очереди и доступно для чтения, хотя исключение говорит, что исключение произошло при откате?
А может быть есть еще какое-то способ плавно сделать откат и прервать выполнение метода?


Подробнее здесь: https://stackoverflow.com/questions/792 ... mslistener
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»