Я намеренно использовал неправильный десериализатор, чтобы проверить, как Kafka обрабатывает ошибки десериализации. Я обнаружил, что смещение темы Kafka было зафиксировано автоматически, несмотря на ошибку десериализации.
Я уже реализовал ErrorHandlingDeserializer и DefaultErrorHandler, но смещение по-прежнему фиксируется автоматически, то есть задержек не обнаружено.
My application.yaml:
server:
port:9292
spring:
kafka:
consumer:
group-id: consumer-group-1
enable-auto-commit: false
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.type.mapping: com.test.kafka_producer.dto.BankTransferEvent:com.test.kafka_consumer.dto.BankTransferEvent
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.json.trusted.packages: com.test.kafka_consumer.dto,com.test.kafka_producer.dto
Моя конфигурация:
package com.test.kafka_consumer.configuration;
import com.test.kafka_consumer.dto.BankTransferEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler() {
// z.B. kein Commit bei Fehlern
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1000L, 2L)); // 2 Retries
errorHandler.setCommitRecovered(false); // verhindert Offset Commit nach Recovery
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// Manuelles Acknowledgment aktivieren:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// WICHTIG: Fehlerhandler dem Container zuweisen
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}
Мой Kafkalistener:
package com.test.kafka_consumer.service;
import com.test.kafka_consumer.dto.BankTransferEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "topic-json",groupId = "consumer-group-2", containerFactory = "kafkaListenerContainerFactory")
public void consumeJsonEvent1(BankTransferEvent bankTransferEvent, Acknowledgment ack){
log.info("Kafka Consumer aufgerufen");
try {
log.info("consumer-JSON-1 consume the event: {" + bankTransferEvent.toString() + "}");
ack.acknowledge();
} catch (Exception e){
log.error("Error bei der Event-Verarbeitung: " + e.getMessage());
}
}
}
Подробнее здесь: https://stackoverflow.com/questions/797 ... on-problem
Смещение темы Kafka было зафиксировано, несмотря на проблему десериализации ⇐ JAVA
Программисты JAVA общаются здесь
1766407925
Anonymous
Я намеренно использовал неправильный десериализатор, чтобы проверить, как Kafka обрабатывает ошибки десериализации. Я обнаружил, что смещение темы Kafka было зафиксировано автоматически, несмотря на ошибку десериализации.
Я уже реализовал ErrorHandlingDeserializer и DefaultErrorHandler, но смещение по-прежнему фиксируется автоматически, то есть задержек не обнаружено.
My application.yaml:
server:
port:9292
spring:
kafka:
consumer:
group-id: consumer-group-1
enable-auto-commit: false
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.type.mapping: com.test.kafka_producer.dto.BankTransferEvent:com.test.kafka_consumer.dto.BankTransferEvent
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.json.trusted.packages: com.test.kafka_consumer.dto,com.test.kafka_producer.dto
Моя конфигурация:
package com.test.kafka_consumer.configuration;
import com.test.kafka_consumer.dto.BankTransferEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler() {
// z.B. kein Commit bei Fehlern
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1000L, 2L)); // 2 Retries
errorHandler.setCommitRecovered(false); // verhindert Offset Commit nach Recovery
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
// Manuelles Acknowledgment aktivieren:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// WICHTIG: Fehlerhandler dem Container zuweisen
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}
Мой Kafkalistener:
package com.test.kafka_consumer.service;
import com.test.kafka_consumer.dto.BankTransferEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class KafkaConsumer {
@KafkaListener(topics = "topic-json",groupId = "consumer-group-2", containerFactory = "kafkaListenerContainerFactory")
public void consumeJsonEvent1(BankTransferEvent bankTransferEvent, Acknowledgment ack){
log.info("Kafka Consumer aufgerufen");
try {
log.info("consumer-JSON-1 consume the event: {" + bankTransferEvent.toString() + "}");
ack.acknowledge();
} catch (Exception e){
log.error("Error bei der Event-Verarbeitung: " + e.getMessage());
}
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/79748476/kafka-topic-offset-was-committed-despite-deserialization-problem[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия