Код: Выделить всё
ConsumerFactoryКод: Выделить всё
@KafkaListener(
autoStartup = "false",
topics = "someTopic",
containerFactory = "manualAckKafkaListenerContainerFactory",
concurrency = "8",
batch = "true",
clientIdPrefix = "prefix")
@Override
public void accept(List messages, Acknowledgment ack) {
try {
processMessages(messages);
ack.acknowledge();
} catch (Exception e) {
logger.error(e.getMessage(), e);
ack.nack(0, Duration.ofSeconds(30));
}
}
при включении журналов отладки и установке параллелизма на 1 я получаю следующий вывод журнала:
Код: Выделить всё
2024-12-16T19:06:12.593+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 1 records
2024-12-16T19:06:12.599+01:00 DEBUG 14842 --- [some-service] [ kafka-1] l.a.BatchMessagingMessageListenerAdapter : Processing [GenericMessage [payload=[GenericMessage [payload=byte[326], headers={kafka_offset=0, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@60c6b27d, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=some_offer_id, kafka_receivedTopic=someTopic, kafka_receivedTimestamp=1613959321000, kafka_acknowledgment=Acknowledgment for org.apache.kafka.clients.consumer.ConsumerRecords@4123971f, kafka_groupId=someListener}]], headers={id=0048a31c-d7fc-9b94-2e6f-3b477607b9d1, timestamp=1734372372599}]]
// stacktrace from the logger in above code is logged here
2024-12-16T19:06:27.655+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:27.662+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Pausing for nack sleep: [someTopic-0]
2024-12-16T19:06:27.662+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:27.675+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:27.677+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:06:32.680+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:06:32.681+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:32.682+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:06:37.683+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:06:37.684+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:37.684+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:06:42.685+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:06:42.686+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:42.686+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:06:47.688+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:06:47.688+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:47.688+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:06:52.689+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:06:52.690+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:52.690+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:06:57.692+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:06:57.694+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:06:57.694+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:07:02.696+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:07:02.696+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:07:02.697+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:07:07.698+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:07:07.702+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:07:07.702+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Still paused for nack sleep
2024-12-16T19:07:12.704+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
2024-12-16T19:07:12.706+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Resumed after nack sleep: [someTopic-0]
2024-12-16T19:07:12.709+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Commit list: {}
2024-12-16T19:07:17.711+01:00 DEBUG 14842 --- [some-service] [ kafka-1] o.s.k.l.KafkaMessageListenerContainer : Received: 0 records
Подробнее здесь: https://stackoverflow.com/questions/792 ... after-nack
Мобильная версия