Слушатель больше не звонил после снаJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Слушатель больше не звонил после сна

Сообщение Anonymous »

Дано:

Код: Выделить всё

ConsumerFactory
с установленным ContainerProperties.AckMode.MANUAL.

Код: Выделить всё

    @KafkaListener(
autoStartup = "false",
topics = "someTopic",
containerFactory = "manualAckKafkaListenerContainerFactory",
concurrency = "8",
batch = "true",
clientIdPrefix = "prefix")
@Override
public void accept(List messages, Acknowledgment ack) {
try {
processMessages(messages);
ack.acknowledge();
} catch (Exception e) {
logger.error(e.getMessage(), e);
ack.nack(0, Duration.ofSeconds(30));
}
}
Как только вызывается nack, метод прослушивателя больше никогда не вызывается, хотя у меня есть сообщение по теме, которое еще не было успешно обработано (отсюда и nack >-позвонить). Я предполагаю, что вызов nack с индексом 0 отбрасывает весь пакет, приостанавливает потребителя на 30 секунд и пытается повторно обработать сообщения. Как я могу это сделать? Я думал, что этот параметр сна nack действует как автоматический выключатель, поддерживая жизнь потребителя и не вызывая перебалансировку.
при включении журналов отладки и установке параллелизма на 1 я получаю следующий вывод журнала:

Код: Выделить всё

2024-12-16T19:06:12.593+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 1 records
2024-12-16T19:06:12.599+01:00 DEBUG 14842 --- [some-service] [        kafka-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[GenericMessage [payload=byte[326], headers={kafka_offset=0, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@60c6b27d, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some_offer_id, kafka_receivedTopic=someTopic, kafka_receivedTimestamp=1613959321000, kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@4123971f, kafka_groupId=someListener}]], headers={id=0048a31c-d7fc-9b94-2e6f-3b477607b9d1, timestamp=1734372372599}]]
// stacktrace from the logger in above code is logged here
2024-12-16T19:06:27.655+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:27.662+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Pausing for nack sleep: [someTopic-0]
2024-12-16T19:06:27.662+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:27.675+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:27.677+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:06:32.680+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:06:32.681+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:32.682+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:06:37.683+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:06:37.684+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:37.684+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:06:42.685+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:06:42.686+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:42.686+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:06:47.688+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:06:47.688+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:47.688+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:06:52.689+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:06:52.690+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:52.690+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:06:57.692+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:06:57.694+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:06:57.694+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:07:02.696+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:07:02.696+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list:  {}
2024-12-16T19:07:02.697+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:07:07.698+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:07:07.702+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:07:07.702+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Still paused for nack sleep
2024-12-16T19:07:12.704+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-12-16T19:07:12.706+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Resumed after nack sleep: [someTopic-0]
2024-12-16T19:07:12.709+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-12-16T19:07:17.711+01:00 DEBUG 14842 --- [some-service] [        kafka-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
сон, кажется, работает, но опрос записей возвращает 0.
при отладке опроса после nack-вызова я завершаю в методе LegacyKafkaConsumer Private ConsumerRecords poll(final Timer timer, Final Boolean includeMetadataInTimeout), который всегда возвращает ConsumerRecords.empty(), потому что Fetch пуст, а таймер истекает через короткое время.

Подробнее здесь: https://stackoverflow.com/questions/792 ... after-nack
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»