Я использую реактивный обмен сообщениями Quarkus и Smallrye, и у меня есть простой метод, использующий тему Кафки:
Код: Выделить всё
@Incoming("dedicated-channel-19")
@Blocking
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Retry(delay = 10, delayUnit = ChronoUnit.SECONDS, maxRetries = -1, maxDuration = 0, retryOn = KafkaRetryableException.class)
@CurrentThreadContext(propagated = ThreadContext.ALL_REMAINING)
public CompletionStage consumeDedicatedTopic19(Message message) {
return consumeEntry(message);
}
Код: Выделить всё
mp:
messaging:
incoming:
dedicated-channel-19:
connector: smallrye-kafka
topics: ${technical.kafka.topics.ohpEntry.prefix}_dedicated-channel-19_${technical.kafka.topics.ohpEntry.main.suffix}, ${technical.kafka.topics.ohpEntry.prefix}_dedicated-channel-19_${technical.kafka.topics.ohpEntry.errorRetry.suffix}
failure-strategy: ignore
throttled:
unprocessed-record-max-age:
ms: -1
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: ${READ_SASL_CONFIG}
group:
id: ${technical.kafka.topics.ohpEntry.groupid}
Проблема:
Когда повторные попытки происходят слишком долго (на данный момент я видел, что моя проблема возникает примерно через 2 часа повторных попыток) ), мой потребитель перестанет обрабатывать новые сообщения -> я повторяю отправку сообщения в течение более 2 часов, затем недоступная служба снова активируется, поэтому мое сообщение успешно обработано и сообщение подтверждено. Но тогда новые сообщения больше не потребляются и складываются в очередь. Единственный способ справиться с этим — перезапустить микросервис.
Что я пробовал:
- Я просмотрел эту документацию https://quarkus.io/guides/kafka#kafka-configuration и попробовал поиграть с параметрами throttled.unprocessed-record-max-age.ms и пауза, если нет запросов. Я думал, что мои потребители приостанавливают или останавливаются, потому что они не получили подтверждения или отказа в течение заданного интервала времени, но это ничего не изменило.
- Я пытался следить за состоянием своей темы и потребителя. но когда появляется проблема, проверка работоспособности показывает, что все в порядке.
- В моих журналах нет ошибок.
- Я пытался добавить запасной метод для перехвата любого исключения, которое было бы выброшено молча, но не было бы перехвачено. что угодно.
- Я пытался обновить Quarkus 3.6.5 до Quarkus 3.15.1, но поведение такое же.
Подробнее здесь: https://stackoverflow.com/questions/793 ... r-too-long