Код: Выделить всё
@KafkaListener(
autoStartup = "false",
topics = "topic name",
containerFactory = "byteArrayKafkaListenerContainerFactory",
concurrency = "8",
batch = "true",
clientIdPrefix = "clientIdPrefix",
id = "#{__listener.id}")
@Override
public void accept(List messages) {
circuitBreaker
.decorateRunnable(() -> retryTemplate.execute(context -> processMessages(messages)))
.run();
}
Код: Выделить всё
@Bean
CircuitBreakerConfig circuitBreakerConfig() {
return CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.minimumNumberOfCalls(1)
.waitDurationInOpenState(Duration.ofMinutes(1))
.slidingWindowSize(1)
.build();
}
@Bean
CircuitBreakerRegistry circuitBreakerRegistry(CircuitBreakerConfig circuitBreakerConfig) {
return CircuitBreakerRegistry.of(circuitBreakerConfig);
}
@Bean
CircuitBreaker circuitBreaker(
CircuitBreakerRegistry circuitBreakerRegistry,
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
var circuitBreaker = circuitBreakerRegistry.circuitBreaker("circuitBreakerId");
circuitBreaker.getEventPublisher().onStateTransition(event -> {
var stateTransition = event.getStateTransition();
var listenerContainerId = "listenerId";
switch (stateTransition) {
case CLOSED_TO_OPEN, CLOSED_TO_FORCED_OPEN, HALF_OPEN_TO_OPEN: {
var container = kafkaListenerEndpointRegistry.getListenerContainer(listenerContainerId);
if (container != null) {
container.pause();
}
break;
}
case OPEN_TO_HALF_OPEN, HALF_OPEN_TO_CLOSED, FORCED_OPEN_TO_CLOSED, FORCED_OPEN_TO_HALF_OPEN: {
var container = kafkaListenerEndpointRegistry.getListenerContainer(listenerContainerId);
if (container != null) {
container.resume();
}
break;
}
default:
throw new IllegalStateException("Unknown state transition: " + stateTransition);
}
});
return circuitBreaker;
}
Код: Выделить всё
....KafkaMessageListenerContainer : Paused consumer resumed by Kafka due to rebalance; consumer paused again, so the initial poll() will never return any records
Подробнее здесь: https://stackoverflow.com/questions/791 ... a-listener
Мобильная версия