интеграция Springboot kafka @retryabletopic не эффективна, я не могу контролировать количество повторных попыток; он непрерывно повторяет. В консоли непрерывно печатаются два типа журналов ошибок: один из них «не может доставить в очередь повторения», а другой - «Отправлять исключение не в транзакции». @Transactional annotation on the message consumer side.
version
JDK1.8 spring-boot-starter-parent:2.7.17 spring-kafka:2.9.13
broker:kafka_2.12-3.6.1
code
https://github.com/limerenceamumu/kafka-study
потребитель
Код: Выделить всё
// 消费者组1:处理订单
@RetryableTopic(
attempts = "4", // 总共尝试 4 次(1次原始 + 3次重试)
backoff = @Backoff(delay = 1000, multiplier = 2.0), // 1s, 2s, 4s 重试
dltTopicSuffix = ".dlt", // 死信队列 Topic 名:order-topic.dlt
include = {Exception.class}, // 重试所有异常类型,
dltStrategy = DltStrategy.FAIL_ON_ERROR // 处理失败时进入 DLQ
)
@KafkaListener(
topics = "order-topic-new",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeOrderMessage(ConsumerRecord record, @org.springframework.messaging.handler.annotation.Payload(required = false) Object payload, Acknowledgment acknowledgment) {
try {
Order order = record.value();
log.info("接收到订单消息: 订单ID={}, 主题={}, 分区={}, 偏移量={}",
order.getOrderId(),
record.topic(),
record.partition(),
record.offset());
// 处理订单
processOrder(order);
//加限定 抛异常
if (order.getCustomerName().equals("ex")) {
throw new RuntimeException("order process error");
}
acknowledgment.acknowledge();
log.info("order process ack: {}", order.getOrderId());
} catch (Exception e) {
log.error("order process error: {}", e.getMessage(), e);
throw e;
}
}
конфигурация
@RequiredArgsConstructor
@EnableScheduling
@Configuration
@EnableKafka
public class KafkaConfig extends RetryTopicConfigurationSupport {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String consumerGroupId;
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(Exception.class)
.backOff(new FixedBackOff(5000, 2));
}
// // 创建订单主题
// @Bean
// public NewTopic orderTopic() {
// Map configs = new HashMap();
// configs.put("retention.ms", "86400000"); // 保留1天
//
// return TopicBuilder.name("order-topic")
// .partitions(3) // 3个分区
// .replicas(1) // 1个副本
// .configs(configs)
// .build();
// }
// 创建订单主题
@Bean
public NewTopic orderTopicDLQ() {
Map configs = new HashMap();
configs.put("retention.ms", "86400000"); // 保留1天
return TopicBuilder.name("order-topic-new.dlt")
.partitions(3) // 3个分区
.replicas(1) // 1个副本
.configs(configs)
.build();
}
// 创建订单确认主题
@Bean
public NewTopic orderConfirmationTopic() {
return TopicBuilder.name("order-confirmation-topic")
.partitions(2)
.replicas(1)
.build();
}
// 生产者配置
@Bean
public ProducerFactory orderProducerFactory() {
Map configProps = new HashMap();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configProps.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有副本确认
configProps.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 开启幂等性
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-transaction"); // 事务ID配置
return new DefaultKafkaProducerFactory(configProps);
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(orderProducerFactory());
}
//
// @Bean("defaultRetryTopicKafkaTemplate")
// public KafkaTemplate defaultRetryTopicKafkaTemplate(ProducerFactory producerFactory) {
// KafkaTemplate template = new KafkaTemplate(producerFactory);
// // 👇 关键:允许非事务发送
// template.setAllowNonTransactional(true);
// return template;
// }
// @Bean
// public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(KafkaTemplate retryKafkaTemplate) {
// return new DeadLetterPublishingRecoverer(retryKafkaTemplate);
// }
// 消费者配置
@Bean
public ConsumerFactory orderConsumerFactory() {
Map props = new HashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.kafkaorderdemo.model");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 从最早的消息开始消费
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 禁用自动提交
return new DefaultKafkaConsumerFactory(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(orderConsumerFactory());
factory.setBatchListener(false);
factory.setReplyTemplate(kafkaTemplate());
// 设置手动确认模式,使Acknowledgment可用
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
// 事务管理器配置
@Bean
public KafkaTransactionManager kafkaTransactionManager() {
KafkaTransactionManager transactionManager = new KafkaTransactionManager(orderProducerFactory());
transactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
return transactionManager;
}
}
2025-10-07 12:01:09 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.s.k.r.DeadLetterPublishingRecovererFactory$1 - Dead-letter publication to order-topic-new-retry-0failed for: order-topic-new-0@0
java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
at org.springframework.util.Assert.state(Assert.java:76)
at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:782)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:674)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:459)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:654)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:562)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:527)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:499)
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:237)
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:191)
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:107)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.springframework.kafka.listener.SeekUtils.doSeeks(SeekUtils.java:104)
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:207)
at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:174)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2854)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2722)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2572)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2448)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2078)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1430)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1394)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1291)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
2025-10-07 12:01:09 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] ERROR o.s.k.listener.DefaultErrorHandler - Failed to determine if this record (order-topic-new-0@0) should be recovererd, including in seeks
org.springframework.kafka.KafkaException: Dead-letter publication to order-topic-new-retry-0failed for: order-topic-new-0@0
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.verifySendResult(DeadLetterPublishingRecoverer.java:683)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.publish(DeadLetterPublishingRecoverer.java:666)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.send(DeadLetterPublishingRecoverer.java:562)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.sendOrThrow(DeadLetterPublishingRecoverer.java:527)
at org.springframework.kafka.listener.DeadLetterPublishingRecoverer.accept(DeadLetterPublishingRecoverer.java:499)
at org.springframework.kafka.listener.FailedRecordTracker.attemptRecovery(FailedRecordTracker.java:237)
at org.springframework.kafka.listener.FailedRecordTracker.recovered(FailedRecordTracker.java:191)
at org.springframework.kafka.listener.SeekUtils.lambda$doSeeks$5(SeekUtils.java:107)
Подробнее здесь: https://stackoverflow.com/questions/797 ... -effective
Мобильная версия