Потребители Quarkus Kafka перестают потреблять при слишком долгой обработке сообщенияJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Потребители Quarkus Kafka перестают потреблять при слишком долгой обработке сообщения

Сообщение Anonymous »

Контекст:
Я использую реактивный обмен сообщениями Quarkus и Smallrye, и у меня есть простой метод, использующий тему Кафки:

Код: Выделить всё

@Incoming("dedicated-channel-19")
@Blocking
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Retry(delay = 10, delayUnit = ChronoUnit.SECONDS, maxRetries = -1, maxDuration = 0, retryOn = KafkaRetryableException.class)
@CurrentThreadContext(propagated = ThreadContext.ALL_REMAINING)
public CompletionStage consumeDedicatedTopic19(Message message) {
return consumeEntry(message);
}
И моя конфигурация:

Код: Выделить всё

mp:
messaging:
incoming:
dedicated-channel-19:
connector: smallrye-kafka
topics: ${technical.kafka.topics.ohpEntry.prefix}_dedicated-channel-19_${technical.kafka.topics.ohpEntry.main.suffix}, ${technical.kafka.topics.ohpEntry.prefix}_dedicated-channel-19_${technical.kafka.topics.ohpEntry.errorRetry.suffix}
failure-strategy: ignore
throttled:
unprocessed-record-max-age:
ms: -1
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: ${READ_SASL_CONFIG}
group:
id: ${technical.kafka.topics.ohpEntry.groupid}
Я подтверждаю или отклоняю свое сообщение вручную. Рабочий процесс синхронный, и я обрабатываю сообщения из темы одно за другим. У меня есть аннотация @Retry, которая обрабатывает технические сбои ->, например, при обработке сообщения, если произошло техническое исключение (503...), я выдаю KafkaRetryableExeption и повторяю попытку, пока внешняя вызываемая служба снова не заработает.
Проблема:
Когда повторные попытки происходят слишком долго (на данный момент я видел, что моя проблема возникает примерно через 2 часа повторных попыток) ), мой потребитель перестанет обрабатывать новые сообщения -> я повторяю отправку сообщения в течение более 2 часов, затем недоступная служба снова активируется, поэтому мое сообщение успешно обработано и сообщение подтверждено. Но тогда новые сообщения больше не потребляются и складываются в очередь. Единственный способ справиться с этим — перезапустить микросервис.
Что я пробовал:
  • Я просмотрел эту документацию https://quarkus.io/guides/kafka#kafka-configuration и попробовал поиграть с параметрами throttled.unprocessed-record-max-age.ms и пауза, если нет запросов. Я думал, что мои потребители приостанавливают или останавливаются, потому что они не получили подтверждения или отказа в течение заданного интервала времени, но это ничего не изменило.
  • Я пытался следить за состоянием своей темы и потребителя. но когда появляется проблема, проверка работоспособности показывает, что все в порядке.
  • В моих журналах нет ошибок.
  • Я пытался добавить запасной метод для перехвата любого исключения, которое было бы выброшено молча, но не было бы перехвачено. что угодно.
  • Я пытался обновить Quarkus 3.6.5 до Quarkus 3.15.1, но поведение такое же.
Большое спасибо за помощь

Подробнее здесь: https://stackoverflow.com/questions/793 ... r-too-long
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»