Код: Выделить всё
kafkaReceiver
.receive()
.delayUntil(record -> saveInDb(record.value())
.doOnNext(v -> record.receiverOffset().commit()
.subscribeOn(Schedulers.boundedElastic())
.subscribe()))
.retry()
.subscribe();
private Mono saveInDb(Dto message) {
// save in database here
}
Меня беспокоит то, что мы каждый раз закрываем и заново создаем KafkaConsumer. time saveInDb не удается. Для меня это выглядит как накладные расходы на операцию закрытия-восстановления. Скажем, если база данных будет недоступна в течение некоторого времени, мы закроем и воссоздадим KafkaConsumer много раз. Согласится ли брокер Kafka на это и не заблокирует меня (моего потребителя) за такие частые операции?
Можно ли как-нибудь просто повторить попытку только операции saveInDb, не закрывая KafkaConsumer?
Подробнее здесь: https://stackoverflow.com/questions/783 ... ing-failed