Я преднамеренно использовал неправильный десериализатор, чтобы проверить, как Кафка обрабатывает ошибки десериализации. Я обнаружил, что смещение темы Kafka было совершено автоматически, несмотря на ошибку десериализации. < /P>
Я уже реализовал ошибку и не определял ошибку и не дефолррурдандлер, но смещение все еще предпринимается автоматически: < /p>
. PrettyPrint-Override ">server:
port:9292
spring:
kafka:
consumer:
group-id: consumer-group-1
enable-auto-commit: false
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.type.mapping: com.test.kafka_producer.dto.BankTransferEvent:com.test.kafka_consumer.dto.BankTransferEvent
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.json.trusted.packages: com.test.kafka_consumer.dto,com.test.kafka_producer.dto
< /code>
Моя конфигурация: < /p>
package com.test.kafka_consumer.configuration;
import com.test.kafka_consumer.dto.BankTransferEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler() {
// z.B. kein Commit bei Fehlern
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1000L, 2L)); // 2 Retries
errorHandler.setCommitRecovered(false); // verhindert Offset Commit nach Recovery
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// Manuelles Acknowledgment aktivieren:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// WICHTIG: Fehlerhandler dem Container zuweisen
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}
< /code>
my kafkalistener: < /p>
package com.test.kafka_consumer.service;
import com.test.kafka_consumer.dto.BankTransferEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "topic-json",groupId = "consumer-group-2", containerFactory = "kafkaListenerContainerFactory")
public void consumeJsonEvent1(BankTransferEvent bankTransferEvent, Acknowledgment ack){
log.info("Kafka Consumer aufgerufen");
try {
log.info("consumer-JSON-1 consume the event: {" + bankTransferEvent.toString() + "}");
ack.acknowledge();
} catch (Exception e){
log.error("Error bei der Event-Verarbeitung: " + e.getMessage());
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/797 ... on-problem
Кафка о смещении темы было совершено, несмотря на проблему деессериализации ⇐ JAVA
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение