Кафка о смещении темы было совершено, несмотря на проблему деессериализацииJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Кафка о смещении темы было совершено, несмотря на проблему деессериализации

Сообщение Anonymous »

Я преднамеренно использовал неправильный десериализатор, чтобы проверить, как Кафка обрабатывает ошибки десериализации. Я обнаружил, что смещение темы Kafka было совершено автоматически, несмотря на ошибку десериализации. < /P>
Я уже реализовал ошибку и не определял ошибку и не дефолррурдандлер, но смещение все еще предпринимается автоматически: < /p>

. PrettyPrint-Override ">server:
port:9292

spring:
kafka:
consumer:
group-id: consumer-group-1
enable-auto-commit: false
bootstrap-servers: localhost:9092
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.type.mapping: com.test.kafka_producer.dto.BankTransferEvent:com.test.kafka_consumer.dto.BankTransferEvent
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.json.trusted.packages: com.test.kafka_consumer.dto,com.test.kafka_producer.dto
< /code>
Моя конфигурация: < /p>
package com.test.kafka_consumer.configuration;

import com.test.kafka_consumer.dto.BankTransferEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;

@Configuration
public class KafkaConsumerConfig {
@Bean
public DefaultErrorHandler errorHandler() {
// z.B. kein Commit bei Fehlern
DefaultErrorHandler errorHandler = new DefaultErrorHandler(new FixedBackOff(1000L, 2L)); // 2 Retries
errorHandler.setCommitRecovered(false); // verhindert Offset Commit nach Recovery
return errorHandler;
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {

ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);

// Manuelles Acknowledgment aktivieren:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

// WICHTIG: Fehlerhandler dem Container zuweisen
factory.setCommonErrorHandler(errorHandler());

return factory;
}
}
< /code>
my kafkalistener: < /p>
package com.test.kafka_consumer.service;

import com.test.kafka_consumer.dto.BankTransferEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;

import org.springframework.stereotype.Service;

@Service
@Slf4j
public class KafkaConsumer {

@KafkaListener(topics = "topic-json",groupId = "consumer-group-2", containerFactory = "kafkaListenerContainerFactory")
public void consumeJsonEvent1(BankTransferEvent bankTransferEvent, Acknowledgment ack){
log.info("Kafka Consumer aufgerufen");
try {
log.info("consumer-JSON-1 consume the event: {" + bankTransferEvent.toString() + "}");
ack.acknowledge();
} catch (Exception e){
log.error("Error bei der Event-Verarbeitung: " + e.getMessage());
}
}
}


Подробнее здесь: https://stackoverflow.com/questions/797 ... on-problem
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»