Я учусь использовать пружину для Apache Kafka 3.3.1. Я следую официальной документации здесь, здесь и здесь, чтобы установить сериализатор значения JSON для моего KafKatemplate .
К сожалению, в моем случае инициализация производительности с использованием Map Configs Constructor не устанавливает сериализатор, и мой шаблон использует сериализаторы по умолчанию. Только конструктор, который устанавливает клавишу Config-Map + и сериализаторы значений, работает должным образом. Я хотел бы использовать простой конструктор с одним параметром карты, как он записан в DOC: < /p>
return new DefaultKafkaProducerFactory(producerConfigs())
< /code>
Моя конфигурация Kafka: < /p>
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.producer.bootstrap.servers:kafka-1.hello.com:9092, kafka-2.hello.com:9092}")
private String kafkaBootstrapServers;
@Value("${kafka.producer.enable.idempotence:true}")
private String kafkaProducerEnableIdempotence;
@Value("${kafka.producer.acks:all}")
private String kafkaProducerAcks;
@Value("${kafka.producer.retries:2147483647}")
private String kafkaProducerRetries;
@Value("${kafka.producer.linger.ms:0}")
private String kafkaProducerLingerMs;
@Value("${kafka.producer.delivery.timeout.ms:120000}")
private String kafkaProducerDeliveryTimeoutMs;
@Value("${kafka.producer.request.timeout.ms:30000}")
private String kafkaProducerRequestTimeoutMs;
@Value("${kafka.producer.retry.backoff.ms:100}")
private String kafkaProducerRetryBackoffMs;
@Value("${kafka.producer.retry.backoff.max.ms:1000}")
private String kafkaProducerRetryBackoffMaxMs;
@Value("${kafka.topic.name:topic1}")
private String kafkaTopicName;
@Value("${kafka.topic.partitions:1}")
private int kafkaTopicPartitions;
@Value("${kafka.topic.replicas:1}")
private int kafkaTopicReplicas;
@Bean
public ProducerFactory producerFactory() {
// set the key and value serializer this way does not work
// KafkaTemplate uses the default serializers
//DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(producerConfiguration());
// this sets properly the serializers
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(
producerConfiguration(),
new StringSerializer(),
new JsonDeserializer(Event.class));
factory.setProducerPerThread(true);
return factory;
}
@Bean
public KafkaTemplate kafkaTemplate() {
var factory = producerFactory();
log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
return new KafkaTemplate(factory);
}
@Bean
public KafkaAdmin admin() {
Map configs = new HashMap();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic() {
log.debug(
"creating a new kafka topic: \"{name: \"{}\", partitions: {}, replicas: {}}\"",
kafkaTopicName,
kafkaTopicPartitions,
kafkaTopicReplicas);
return TopicBuilder.name(kafkaTopicName)
.partitions(kafkaTopicPartitions)
.replicas(kafkaTopicReplicas)
.build();
}
private Map producerConfiguration() {
Map configs = new HashMap();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerEnableIdempotence);
configs.put(ProducerConfig.ACKS_CONFIG, kafkaProducerAcks);
configs.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetries);
configs.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerLingerMs);
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeoutMs);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeoutMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerRetryBackoffMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, kafkaProducerRetryBackoffMaxMs);
return configs;
}
private String factoryConfigurationToString(ProducerFactory producerFactory) {
var keySerializer = producerFactory.getKeySerializer();
var keySerializerAsString = Objects.isNull(keySerializer) ? "null" : keySerializer.getClass().getName();
var valueSerializer = producerFactory.getValueSerializer();
var valueSerializerAsString = Objects.isNull(valueSerializer) ? "null" : valueSerializer.getClass().getName();
var sb = new StringBuilder().append("configuration: ").append("{");
producerFactory.
getConfigurationProperties().
forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\", ", key, value)));
sb.setLength(sb.length() - 2);
sb.append("}, ");
sb.append("key-serializer: ").append(keySerializerAsString).append(", ");
sb.append("value-serializer: ").append(valueSerializerAsString);
return sb.toString();
}
}
< /code>
Я всегда проверяю результат, используя и тематический браузер. В 1 -м случае я вижу бинарный контент по теме, а не json.DEBUG 200 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : initializing a KafkaTemplate using the following setting:
{
"configuration":{
"retries":"5",
"enable.idempotence":"true",
"retry.backoff.max.ms":"1000",
"value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
"request.timeout.ms":"30000",
"acks":"all",
"bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
"delivery.timeout.ms":"120000",
"retry.backoff.ms":"100",
"key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
"linger.ms":"0"
},
"key-serializer":null,
Подробнее здесь: https://stackoverflow.com/questions/793 ... rams-the-m
Установка значения сериализатор работает только через конструктор с 3 параметрами, конфигурация карты не учитывается ⇐ JAVA
Программисты JAVA общаются здесь
1738073440
Anonymous
Я учусь использовать пружину для Apache Kafka 3.3.1. Я следую официальной документации здесь, здесь и здесь, чтобы установить сериализатор значения JSON для моего KafKatemplate .
К сожалению, в моем случае инициализация производительности с использованием Map Configs Constructor не устанавливает сериализатор, и мой шаблон использует сериализаторы по умолчанию. Только конструктор, который устанавливает клавишу Config-Map + и сериализаторы значений, работает должным образом. Я хотел бы использовать простой конструктор с одним параметром карты, как он записан в DOC: < /p>
return new DefaultKafkaProducerFactory(producerConfigs())
< /code>
Моя конфигурация Kafka: < /p>
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableKafka
public class KafkaConfiguration {
@Value("${kafka.producer.bootstrap.servers:kafka-1.hello.com:9092, kafka-2.hello.com:9092}")
private String kafkaBootstrapServers;
@Value("${kafka.producer.enable.idempotence:true}")
private String kafkaProducerEnableIdempotence;
@Value("${kafka.producer.acks:all}")
private String kafkaProducerAcks;
@Value("${kafka.producer.retries:2147483647}")
private String kafkaProducerRetries;
@Value("${kafka.producer.linger.ms:0}")
private String kafkaProducerLingerMs;
@Value("${kafka.producer.delivery.timeout.ms:120000}")
private String kafkaProducerDeliveryTimeoutMs;
@Value("${kafka.producer.request.timeout.ms:30000}")
private String kafkaProducerRequestTimeoutMs;
@Value("${kafka.producer.retry.backoff.ms:100}")
private String kafkaProducerRetryBackoffMs;
@Value("${kafka.producer.retry.backoff.max.ms:1000}")
private String kafkaProducerRetryBackoffMaxMs;
@Value("${kafka.topic.name:topic1}")
private String kafkaTopicName;
@Value("${kafka.topic.partitions:1}")
private int kafkaTopicPartitions;
@Value("${kafka.topic.replicas:1}")
private int kafkaTopicReplicas;
@Bean
public ProducerFactory producerFactory() {
// set the key and value serializer this way does not work
// KafkaTemplate uses the default serializers
//DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(producerConfiguration());
// this sets properly the serializers
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(
producerConfiguration(),
new StringSerializer(),
new JsonDeserializer(Event.class));
factory.setProducerPerThread(true);
return factory;
}
@Bean
public KafkaTemplate kafkaTemplate() {
var factory = producerFactory();
log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
return new KafkaTemplate(factory);
}
@Bean
public KafkaAdmin admin() {
Map configs = new HashMap();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic() {
log.debug(
"creating a new kafka topic: \"{name: \"{}\", partitions: {}, replicas: {}}\"",
kafkaTopicName,
kafkaTopicPartitions,
kafkaTopicReplicas);
return TopicBuilder.name(kafkaTopicName)
.partitions(kafkaTopicPartitions)
.replicas(kafkaTopicReplicas)
.build();
}
private Map producerConfiguration() {
Map configs = new HashMap();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerEnableIdempotence);
configs.put(ProducerConfig.ACKS_CONFIG, kafkaProducerAcks);
configs.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetries);
configs.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerLingerMs);
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeoutMs);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeoutMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerRetryBackoffMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, kafkaProducerRetryBackoffMaxMs);
return configs;
}
private String factoryConfigurationToString(ProducerFactory producerFactory) {
var keySerializer = producerFactory.getKeySerializer();
var keySerializerAsString = Objects.isNull(keySerializer) ? "null" : keySerializer.getClass().getName();
var valueSerializer = producerFactory.getValueSerializer();
var valueSerializerAsString = Objects.isNull(valueSerializer) ? "null" : valueSerializer.getClass().getName();
var sb = new StringBuilder().append("configuration: ").append("{");
producerFactory.
getConfigurationProperties().
forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\", ", key, value)));
sb.setLength(sb.length() - 2);
sb.append("}, ");
sb.append("key-serializer: ").append(keySerializerAsString).append(", ");
sb.append("value-serializer: ").append(valueSerializerAsString);
return sb.toString();
}
}
< /code>
Я всегда проверяю результат, используя и тематический браузер. В 1 -м случае я вижу бинарный контент по теме, а не json.DEBUG 200 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : initializing a KafkaTemplate using the following setting:
{
"configuration":{
"retries":"5",
"enable.idempotence":"true",
"retry.backoff.max.ms":"1000",
"value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
"request.timeout.ms":"30000",
"acks":"all",
"bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
"delivery.timeout.ms":"120000",
"retry.backoff.ms":"100",
"key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
"linger.ms":"0"
},
"key-serializer":null,
Подробнее здесь: [url]https://stackoverflow.com/questions/79377852/setting-the-value-serializer-only-works-via-the-constructor-with-3-params-the-m[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия