У меня есть потребительское приложение Kafka с весенней загрузкой. Итак, согласно документации, добавленной Factory.getContainerProperties().setDeliveryAttemptHeader(true); ниже, и все еще не удается получить заголовок DeliveryAttempt в записи потребителя в прослушивателе kafka. Что мне не хватает? Какие изменения мне нужны, чтобы получить заголовок DeliveryAttempt на стороне потребителя?
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
final ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(testErrorHandler);
factory.setConcurrency(10);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
return factory;
}
ErrorHandler:
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class TestErrorHandler implements CommonErrorHandler {
@Override
public void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer,
MessageListenerContainer container) {
LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
}
}
В приведенном выше классе я реализовал CommonErrorHander (который внутренне расширяет класс DeliveryAttemptAware).
Потребитель:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class DataConsumer {
@KafkaListener(topics = { "topic1" }, groupId = "test-group-1", containerFactory = "kafkaListenerContainerFactory")
public void listenTestEvents(final ConsumerRecord message) {
log.info("Received TestEvent message :{}, headers: {}", message, message.headers());
if (true) {
throw new RuntimeException();
}
}
}
application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: test-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Конфигурация приложения:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.tmobile.retrykafkaconsume.kafka.TestErrorHandler;
@Configuration
public class AppConfig {
@Autowired
private TestErrorHandler testErrorHandler;
@Bean
public ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
final ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(testErrorHandler);
factory.setConcurrency(10);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
return factory;
}
}
Подробнее здесь: https://stackoverflow.com/questions/717 ... e-in-kafka
Как включить заголовок DELIVERY_ATTEMPT со стороны потребителя в KAFKA? ⇐ JAVA
Программисты JAVA общаются здесь
1762299222
Anonymous
У меня есть потребительское приложение Kafka с весенней загрузкой. Итак, согласно документации, добавленной Factory.getContainerProperties().setDeliveryAttemptHeader(true); ниже, и все еще не удается получить заголовок DeliveryAttempt в записи потребителя в прослушивателе kafka. Что мне не хватает? [b]Какие изменения мне нужны, чтобы получить заголовок DeliveryAttempt на стороне потребителя?[/b]
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
final ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(testErrorHandler);
factory.setConcurrency(10);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
return factory;
}
ErrorHandler:
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class TestErrorHandler implements CommonErrorHandler {
@Override
public void handleRecord(Exception thrownException, ConsumerRecord record, Consumer consumer,
MessageListenerContainer container) {
LogFactory.getLog(getClass()).error("'handleRecord' is not implemented by this handler", thrownException);
}
}
В приведенном выше классе я реализовал CommonErrorHander (который внутренне расширяет класс DeliveryAttemptAware).
Потребитель:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Component
@Slf4j
public class DataConsumer {
@KafkaListener(topics = { "topic1" }, groupId = "test-group-1", containerFactory = "kafkaListenerContainerFactory")
public void listenTestEvents(final ConsumerRecord message) {
log.info("Received TestEvent message :{}, headers: {}", message, message.headers());
if (true) {
throw new RuntimeException();
}
}
}
application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: test-group-1
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Конфигурация приложения:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import com.tmobile.retrykafkaconsume.kafka.TestErrorHandler;
@Configuration
public class AppConfig {
@Autowired
private TestErrorHandler testErrorHandler;
@Bean
public ConsumerFactory consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(
ConsumerFactory consumerFactory) {
final ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(testErrorHandler);
factory.setConcurrency(10);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
return factory;
}
}
Подробнее здесь: [url]https://stackoverflow.com/questions/71767255/how-to-enable-delivery-attempt-header-from-consumer-side-in-kafka[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия