Как включить заголовок DELIVERY_ATTEMPT со стороны потребителя в KAFKA?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Как включить заголовок DELIVERY_ATTEMPT со стороны потребителя в KAFKA?

Сообщение Anonymous »

У меня есть потребительское приложение Kafka с весенней загрузкой. Итак, согласно документации, добавленной Factory.getContainerProperties().setDeliveryAttemptHeader(true); ниже, и все еще не удается получить заголовок DeliveryAttempt в записи потребителя в прослушивателе kafka. Что мне не хватает? Какие изменения мне нужны, чтобы получить заголовок DeliveryAttempt на стороне потребителя?
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {

final ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(testErrorHandler);
factory.setConcurrency(10);

factory.getContainerProperties().setDeliveryAttemptHeader(true);

return factory;
}

ErrorHandler:
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class TestErrorHandler implements CommonErrorHandler {

@Override
public void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer,
MessageListenerContainer container) {

LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
}

}

В приведенном выше классе я реализовал CommonErrorHander (который внутренне расширяет класс DeliveryAttemptAware).
Потребитель:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class DataConsumer {

@KafkaListener(topics = { "topic1" }, groupId = "test-group-1", containerFactory = "kafkaListenerContainerFactory")
public void listenTestEvents(final ConsumerRecord message) {

log.info("Received TestEvent message :{}, headers: {}", message, message.headers());

if (true) {
throw new RuntimeException();
}

}
}

application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: test-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Конфигурация приложения:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;

import com.tmobile.retrykafkaconsume.kafka.TestErrorHandler;

@Configuration
public class AppConfig {

@Autowired
private TestErrorHandler testErrorHandler;

@Bean
public ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {

final ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(testErrorHandler);
factory.setConcurrency(10);

factory.getContainerProperties().setDeliveryAttemptHeader(true);

return factory;
}

}


Подробнее здесь: https://stackoverflow.com/questions/717 ... e-in-kafka
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»