Лучший способ использовать автоматический выключатель в прослушивателе KafkaJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Лучший способ использовать автоматический выключатель в прослушивателе Kafka

Сообщение Anonymous »

У меня есть приложение весенней загрузки, написанное на Java и имеющее прослушиватель Kafka:

Код: Выделить всё

    @KafkaListener(
autoStartup = "false",
topics = "topic name",
containerFactory = "byteArrayKafkaListenerContainerFactory",
concurrency = "8",
batch = "true",
clientIdPrefix = "clientIdPrefix",
id = "#{__listener.id}")
@Override
public void accept(List messages) {
circuitBreaker
.decorateRunnable(() -> retryTemplate.execute(context -> processMessages(messages)))
.run();
}
Конфигурация автоматического выключателя (resilience4j) выделена ниже. Выключатель цепи, который используется выше, — это последнее, что настраивается в приведенном ниже кодовом блоке.

Код: Выделить всё

    @Bean
CircuitBreakerConfig circuitBreakerConfig() {
return CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.minimumNumberOfCalls(1)
.waitDurationInOpenState(Duration.ofMinutes(1))
.slidingWindowSize(1)
.build();
}

@Bean
CircuitBreakerRegistry circuitBreakerRegistry(CircuitBreakerConfig circuitBreakerConfig) {
return CircuitBreakerRegistry.of(circuitBreakerConfig);
}

@Bean
CircuitBreaker circuitBreaker(
CircuitBreakerRegistry circuitBreakerRegistry,
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
var circuitBreaker = circuitBreakerRegistry.circuitBreaker("circuitBreakerId");
circuitBreaker.getEventPublisher().onStateTransition(event -> {
var stateTransition = event.getStateTransition();
var listenerContainerId = "listenerId";
switch (stateTransition) {
case CLOSED_TO_OPEN, CLOSED_TO_FORCED_OPEN, HALF_OPEN_TO_OPEN: {
var container = kafkaListenerEndpointRegistry.getListenerContainer(listenerContainerId);
if (container != null) {
container.pause();
}
break;
}
case OPEN_TO_HALF_OPEN, HALF_OPEN_TO_CLOSED, FORCED_OPEN_TO_CLOSED, FORCED_OPEN_TO_HALF_OPEN: {
var container = kafkaListenerEndpointRegistry.getListenerContainer(listenerContainerId);
if (container != null) {
container.resume();
}
break;
}
default:
throw new IllegalStateException("Unknown state transition: " + stateTransition);
}
});
return circuitBreaker;
}
Если автоматический выключатель обнаруживает первое исключение, срабатывает код издателя событий onStateTransition, и поэтому для контейнера прослушивателя Kafka запрашивается приостановка. Следующее, что я вижу в журналах, это то, что все разделы Kafka отозваны и разделы назначены заново. Между сообщениями о назначении разделов я вижу следующее:

Код: Выделить всё

....KafkaMessageListenerContainer    : Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records
Проблема в том, что контейнер прослушивателя Kafka больше не приостанавливается, но и не возобновляет прослушивание темы. Я хотел бы правильно приостановить контейнер прослушивателя Kafka и возобновить его, как только автоматический выключатель перейдет в соответствующее состояние. Каков наилучший способ добиться этого? Я хочу выполнить приостановку прослушивателя Кафки, не вызывая перебалансировки. Сценарий: два модуля Java-приложения с весенней загрузкой, работающие в кластере Kubernetes, и прослушиватель Kafka одного из этих модулей сталкивается с временной проблемой при обработке сообщений (например, сохранением в базе данных из-за проблемы с подключением) и поэтому должен прекратить обработку сообщений до тех пор, пока не будет отключен автоматический выключатель. считает возможным возобновить работу.

Подробнее здесь: https://stackoverflow.com/questions/791 ... a-listener
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»