Установка значения сериализатор работает только через конструктор с 3 параметрами, конфигурация карты не учитываетсяJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Установка значения сериализатор работает только через конструктор с 3 параметрами, конфигурация карты не учитывается

Сообщение Anonymous »

Я учусь использовать пружину для Apache Kafka 3.3.1. Я следую официальной документации здесь, здесь и здесь, чтобы установить сериализатор значения JSON для моего KafKatemplate .
К сожалению, в моем случае инициализация производительности с использованием Map Configs Constructor не устанавливает сериализатор, и мой шаблон использует сериализаторы по умолчанию. Только конструктор, который устанавливает клавишу Config-Map + и сериализаторы значений, работает должным образом. Я хотел бы использовать простой конструктор с одним параметром карты, как он записан в DOC: < /p>
return new DefaultKafkaProducerFactory(producerConfigs())
< /code>
Моя конфигурация Kafka: < /p>
@Slf4j
@Configuration
@RequiredArgsConstructor
@EnableKafka
public class KafkaConfiguration {

@Value("${kafka.producer.bootstrap.servers:kafka-1.hello.com:9092, kafka-2.hello.com:9092}")
private String kafkaBootstrapServers;

@Value("${kafka.producer.enable.idempotence:true}")
private String kafkaProducerEnableIdempotence;

@Value("${kafka.producer.acks:all}")
private String kafkaProducerAcks;

@Value("${kafka.producer.retries:2147483647}")
private String kafkaProducerRetries;

@Value("${kafka.producer.linger.ms:0}")
private String kafkaProducerLingerMs;

@Value("${kafka.producer.delivery.timeout.ms:120000}")
private String kafkaProducerDeliveryTimeoutMs;

@Value("${kafka.producer.request.timeout.ms:30000}")
private String kafkaProducerRequestTimeoutMs;

@Value("${kafka.producer.retry.backoff.ms:100}")
private String kafkaProducerRetryBackoffMs;

@Value("${kafka.producer.retry.backoff.max.ms:1000}")
private String kafkaProducerRetryBackoffMaxMs;

@Value("${kafka.topic.name:topic1}")
private String kafkaTopicName;

@Value("${kafka.topic.partitions:1}")
private int kafkaTopicPartitions;

@Value("${kafka.topic.replicas:1}")
private int kafkaTopicReplicas;

@Bean
public ProducerFactory producerFactory() {
// set the key and value serializer this way does not work
// KafkaTemplate uses the default serializers
//DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(producerConfiguration());

// this sets properly the serializers
DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory(
producerConfiguration(),
new StringSerializer(),
new JsonDeserializer(Event.class));
factory.setProducerPerThread(true);
return factory;
}

@Bean
public KafkaTemplate kafkaTemplate() {
var factory = producerFactory();
log.debug("initializing a KafkaTemplate using the following setting: {{}}", factoryConfigurationToString(factory));
return new KafkaTemplate(factory);
}

@Bean
public KafkaAdmin admin() {
Map configs = new HashMap();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic() {
log.debug(
"creating a new kafka topic: \"{name: \"{}\", partitions: {}, replicas: {}}\"",
kafkaTopicName,
kafkaTopicPartitions,
kafkaTopicReplicas);

return TopicBuilder.name(kafkaTopicName)
.partitions(kafkaTopicPartitions)
.replicas(kafkaTopicReplicas)
.build();
}

private Map producerConfiguration() {
Map configs = new HashMap();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, kafkaProducerEnableIdempotence);
configs.put(ProducerConfig.ACKS_CONFIG, kafkaProducerAcks);
configs.put(ProducerConfig.RETRIES_CONFIG, kafkaProducerRetries);
configs.put(ProducerConfig.LINGER_MS_CONFIG, kafkaProducerLingerMs);
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaProducerDeliveryTimeoutMs);
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, kafkaProducerRequestTimeoutMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, kafkaProducerRetryBackoffMs);
configs.put(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, kafkaProducerRetryBackoffMaxMs);
return configs;
}

private String factoryConfigurationToString(ProducerFactory producerFactory) {
var keySerializer = producerFactory.getKeySerializer();
var keySerializerAsString = Objects.isNull(keySerializer) ? "null" : keySerializer.getClass().getName();

var valueSerializer = producerFactory.getValueSerializer();
var valueSerializerAsString = Objects.isNull(valueSerializer) ? "null" : valueSerializer.getClass().getName();

var sb = new StringBuilder().append("configuration: ").append("{");
producerFactory.
getConfigurationProperties().
forEach((key, value) -> sb.append(String.format("\"%s\": \"%s\", ", key, value)));

sb.setLength(sb.length() - 2);
sb.append("}, ");
sb.append("key-serializer: ").append(keySerializerAsString).append(", ");
sb.append("value-serializer: ").append(valueSerializerAsString);
return sb.toString();
}
}
< /code>
Я всегда проверяю результат, используя и тематический браузер. В 1 -м случае я вижу бинарный контент по теме, а не json.DEBUG 200 --- [kafka-producer] [ main] c.r.g.s.m.p.c.KafkaConfiguration : initializing a KafkaTemplate using the following setting:
{
"configuration":{
"retries":"5",
"enable.idempotence":"true",
"retry.backoff.max.ms":"1000",
"value.serializer":"class org.springframework.kafka.support.serializer.JsonSerializer",
"request.timeout.ms":"30000",
"acks":"all",
"bootstrap.servers":"kafka-1.hello.com:9092, kafka-2.hello.com:9092",
"delivery.timeout.ms":"120000",
"retry.backoff.ms":"100",
"key.serializer":"class org.apache.kafka.common.serialization.StringSerializer",
"linger.ms":"0"
},
"key-serializer":null,

Подробнее здесь: https://stackoverflow.com/questions/793 ... rams-the-m
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»