Как я могу сделать ручную резюме работать по подписке на тему Kafka?JAVA

Программисты JAVA общаются здесь
Anonymous
Как я могу сделать ручную резюме работать по подписке на тему Kafka?

Сообщение Anonymous »

ожидаемое поведение < /strong>
Я ожидаю, что приемник возобновит, как прослушивание событий из тематической работы по вызову называется Consumer.Resume (Consumer.Assignment ()); < /p>
Фактическое поведение < /strong>
Это бросает ошибку, которую вы должны вызвать один из методов получения. Кодовая логика

Код: Выделить всё

public  void applyCircuitBreaker(CircuitBreaker circuitBreaker, KafkaReceiver kafkaReceiver) {
circuitBreaker.getEventPublisher().onStateTransition((event) -> {
if (event.getStateTransition() == StateTransition.CLOSED_TO_OPEN) {
this.pauseReceiver(kafkaReceiver);
} else if (event.getStateTransition() == StateTransition.OPEN_TO_HALF_OPEN || event.getStateTransition() == StateTransition.HALF_OPEN_TO_CLOSED) {
this.resumeReceiver(kafkaReceiver);
}

});
}
< /code>
private  void pauseReceiver(KafkaReceiver kafkaReceiver) {
kafkaReceiver.doOnConsumer((consumer) -> {
consumer.pause(consumer.assignment());
return consumer;
}).doOnSuccess((success) -> {
log.info("Successful pause {}", success.paused());
}).doOnError((e) -> {
log.error("Error in pausing", e);
}).subscribe();
}
< /code>
private  void resumeReceiver(KafkaReceiver kafkaReceiver) {
kafkaReceiver.doOnConsumer((consumer) -> {
consumer.resume(consumer.assignment());
return consumer;
}).doOnSuccess((success) -> {
log.info("Successfully resumed kafka receiver {}", success.assignment());
}).doOnError((e) -> {
log.error("Error in resuming", e);
}).subscribe();
log.info("resumeReceiver: Kafka receiver resumed.");
}
Ищу некоторые предложения, если я упускаю что -нибудь здесь.


Подробнее здесь: https://stackoverflow.com/questions/792 ... bscription

Вернуться в «JAVA»