Неверное значение .Kafka.sslbundlesslenginefactory для конфигурации ssl.engine.factory.class: class .sslbundlesslenginefJAVA

Программисты JAVA общаются здесь
Anonymous
Неверное значение .Kafka.sslbundlesslenginefactory для конфигурации ssl.engine.factory.class: class .sslbundlesslenginef

Сообщение Anonymous »

Я хотел бы употреблять сообщения из Кафки, используя реактор Kafka и SSL -пакет. (Это работает нормально, я могу потреблять сообщения из безопасной SSL Kafka) < /p>
@Bean
public KafkaReceiver kafkaReceiver(final MeterRegistry meterRegistry, final ObservationRegistry observationRegistry) {
final Map properties = new HashMap();
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "/path/to/keystore.jks");
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "keyStorePassphrase");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/path/to/truststore.jks");
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "trustStorePassphrase");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "host.com:9092");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "testtestfour");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testtesttfour");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final ReceiverOptions receiverOptions = ReceiverOptions.create(properties);
return KafkaReceiver.create(receiverOptions
.addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
.addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
.consumerListener(new MicrometerConsumerListener(meterRegistry))
.withObservation(observationRegistry)
.subscription(Collections.singleton("topic")));
}
< /code>
Теперь я надеюсь, что я мог бы использовать SSL Bundle: < /p>
@Bean
public KafkaReceiver kafkaReceiver(MeterRegistry meterRegistry, ObservationRegistry observationRegistry, SslBundles sslBundles) {
final Map properties = new HashMap();
properties.put("security.protocol", "SSL");
properties.put(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG, SslBundleSslEngineFactory.class.getName());
properties.put(SslBundle.class.getName(), sslBundles.getBundle("kafka_bundle"));
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final ReceiverOptions receiverOptions = ReceiverOptions.create(properties);
return KafkaReceiver.create(receiverOptions
.addAssignListener(p -> LOGGER.info("partitions assigned {}", p))
.addRevokeListener(p -> LOGGER.info("partitions revoked {}", p))
.consumerListener(new MicrometerConsumerListener(meterRegistry))
.withObservation(observationRegistry)
.subscription(Collections.singleton("topicSource")));
}
< /code>
spring.ssl.bundle.pem.kafka-bundle.keystore.private-key=[...]
spring.ssl.bundle.pem.kafka-bundle.keystore.certificate=[...]
spring.ssl.bundle.pem.kafka-bundle.truststore.certificate=[...]
< /code>
The values of the three properties are in PEM format:
example
-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIUNM5QQv8IzVQsgSmmdPQNaqyzWs4wDQYJKoZIhvcNAQEL
BQAwezELMAkGA1UEBhMCWFgxEjAQBgNVBAgMCVN0YXRlTmFtZTERMA8GA1UEBwwI
...
V0IJjcmYjEZbTvpjFKznvaFiOUv+8L7jHQ1/Yf+9c3C8gSjdUfv88m17pqYXd+Ds
HEmfmNNjht130UyjNCITmLVXyy5p35vWmdf95U3uEbJSnNVtXH8qRmN9oK9mUpDb
ngX6JBJI7fw7tXoqWSLHNiBODM88fUlQSho8
-----END CERTIFICATE-----
< /code>
Unfortunately, the SSL Bundle consumer is always failing with:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.config.ConfigException: Invalid value org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory could not be found.
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory for configuration ssl.engine.factory.class: Class org.springframework.boot.autoconfigure.kafka.SslBundleSslEngineFactory could not be found.
at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:778)
at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:531)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:524)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:114)
at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:134)
at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:728)
at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:595)
at reactor.kafka.receiver.internals.ConsumerFactory.createConsumer(ConsumerFactory.java:34)
at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler$24(DefaultKafkaReceiver.java:180)
at reactor.core.publisher.MonoCallable.call(MonoCallable.java:72)
at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80)
at reactor.core.publisher.Flux.subscribe(Flux.java:8891)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:9012)
at reactor.core.publisher.Flux.subscribe(Flux.java:8856)
at reactor.core.publisher.Flux.subscribe(Flux.java:8780)
at reactor.core.publisher.Flux.subscribe(Flux.java:8698)
at com.MyService.run(MyService.java:53)
< /code>
May I ask how to properly configure reactor kafka consumer to leverage ssl bundle?

Подробнее здесь: https://stackoverflow.com/questions/794 ... ngine-fact

Вернуться в «JAVA»