Потребители Quarkus Kafka перестают потреблять при слишком долгой обработке сообщенияJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Потребители Quarkus Kafka перестают потреблять при слишком долгой обработке сообщения

Сообщение Anonymous »

Контекст:
Я использую реактивный обмен сообщениями Quarkus и Smallrye, и у меня есть простой метод, использующий тему Кафки:

Код: Выделить всё

@Incoming("dedicated-channel-19")
@Blocking
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
@Retry(delay = 10, delayUnit = ChronoUnit.SECONDS, maxRetries = -1, maxDuration = 0, retryOn = KafkaRetryableException.class)
@CurrentThreadContext(propagated = ThreadContext.ALL_REMAINING)
public CompletionStage consumeDedicatedTopic19(Message message) {
return consumeEntry(message);
}
И моя конфигурация:

Код: Выделить всё

mp:
messaging:
incoming:
dedicated-channel-19:
connector: smallrye-kafka
topics: ${technical.kafka.topics.ohpEntry.prefix}_dedicated-channel-19_${technical.kafka.topics.ohpEntry.main.suffix}, ${technical.kafka.topics.ohpEntry.prefix}_dedicated-channel-19_${technical.kafka.topics.ohpEntry.errorRetry.suffix}
failure-strategy: ignore
throttled:
unprocessed-record-max-age:
ms: -1
security:
protocol: SASL_PLAINTEXT
sasl:
mechanism: SCRAM-SHA-512
jaas:
config: ${READ_SASL_CONFIG}
group:
id: ${technical.kafka.topics.ohpEntry.groupid}
Я подтверждаю или отклоняю свое сообщение вручную. Рабочий процесс синхронный, и я обрабатываю сообщения из темы одно за другим. У меня есть аннотация @Retry, которая обрабатывает технические сбои ->, например, при обработке сообщения, если произошло техническое исключение (503...), я выдаю KafkaRetryableExeption и повторяю попытку, пока внешняя вызываемая служба снова не заработает.
Проблема:
Когда повторные попытки происходят слишком долго (на данный момент я видел, что моя проблема возникает примерно через 2 часа повторных попыток) ), мой потребитель перестанет обрабатывать новые сообщения -> я повторяю отправку сообщения в течение более 2 часов, затем недоступная служба снова активируется, поэтому мое сообщение успешно обработано и сообщение подтверждено. Но тогда новые сообщения больше не потребляются и складываются в очередь. Единственный способ справиться с этим — перезапустить микросервис.
Что я пробовал:
  • Я просмотрел эту документацию https://quarkus.io/guides/kafka#kafka-configuration и попробовал поиграть с параметрами throttled.unprocessed-record-max-age.ms и пауза, если нет запросов. Я думал, что мои потребители приостанавливают или останавливаются, потому что они не получили подтверждения или отказа в течение заданного интервала времени, но это ничего не изменило.
  • Я пытался следить за состоянием своей темы и потребителя. но когда появляется проблема, проверка работоспособности показывает, что все в порядке.
  • В моих журналах нет ошибок.
  • Я пытался добавить запасной метод для перехвата любого исключения, которое было бы выброшено молча, но не было бы перехвачено. что угодно.
  • Я пытался обновить Quarkus 3.6.5 до Quarkus 3.15.1, но поведение такое же.
Большое спасибо за помощь

Подробнее здесь: https://stackoverflow.com/questions/793 ... r-too-long
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»