Код: Выделить всё
clientId="1test_kafka" \
clientSecret="xxx" \
scope="1test_kafka_access" \
unsecuredLoginStringClaim_sub="thePrincipalName";
Я настроил провайдера OAUTH 2 с помощью Keycloak
Код: Выделить всё
POST https://localhost:8443/realms/myorg/protocol/openid-connect/token
Content-Type: application/x-www-form-urlencoded
grant_type=client_credentials&client_id=test_kafka&client_secret=xxx&scope=test_kafka_access
Код: Выделить всё
HTTP/1.1 200 OK
Referrer-Policy: no-referrer
X-Frame-Options: SAMEORIGIN
Strict-Transport-Security: max-age=31536000; includeSubDomains
Cache-Control: no-store
X-Content-Type-Options: nosniff
Set-Cookie: KC_RESTART=; Version=1; Expires=Thu, 01-Jan-1970 00:00:10 GMT; Max-Age=0; Path=/realms/myorg/; HttpOnly
Pragma: no-cache
X-XSS-Protection: 1; mode=block
Content-Type: application/json
connection: close
content-length: 1439
{
"access_token": "token",
"expires_in": 300,
"refresh_expires_in": 0,
"token_type": "Bearer",
"not-before-policy": 0,
"scope": "email test_kafka_access profile"
}
[img]https://i.sstatic. net/116V4S3L.png[/img]
Теперь вот
Kafka server.properties
Код: Выделить всё
# Basic configurations
broker.id=0
num.network.threads=3
num.io.threads=8
log.dirs=/tmp/kafka-logs
num.partitions=1
# Zookeeper settings
zookeeper.connect=localhost:2181
# SASL/OAUTHBEARER mechanism
listeners=SASL_SSL://:9092
advertised.listeners=SASL_SSL://localhost:9092
inter.broker.listener.name=SASL_SSL
listener.name.sasl_ssl.sasl.enabled.mechanisms=OAUTHBEARER
# Inter-broker SASL mechanism configuration
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
# JAAS configuration for OAuth2
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="1test_kafka" \
clientSecret="xxx" \
scope="1test_kafka_access" \
unsecuredLoginStringClaim_sub="thePrincipalName";
# JWKS endpoint and audience/issuer settings
listener.name.sasl_ssl.sasl.oauthbearer.jwks.endpoint.url=https://localhost:8443/realms/myorg/protocol/openid-connect/certs
listener.name.sasl_ssl.sasl.oauthbearer.expected.audience=account
listener.name.sasl_ssl.sasl.oauthbearer.expected.issuer=https://localhost:8443/realms/myorg
# SSL configurations for secure communication
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit
# Enable detailed logging (optional)
log4j.logger.org.apache.kafka.common.security.oauthbearer=DEBUG
#log4j.logger.org.apache.kafka=DEBUG
То же самое для clientSecret, добавив 1.
Область, добавив 1.
Но kafka все равно работает нормально, и я могу получать и создавать сообщения
Это мой файл Producer.config
Код: Выделить всё
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
# JAAS configuration for the producer
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
unsecuredLoginStringClaim_sub="thePrincipalName";
# \
# clientId="test_kafka" \
# clientSecret="xxx" \
# scope="test_kafka_access";
# Additional producer settings
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
# SSL configurations
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit
Код: Выделить всё
# Consumer properties
bootstrap.servers=localhost:9092
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
# JAAS configuration for the consumer
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
unsecuredLoginStringClaim_sub="thePrincipalName" \
clientId="test_kafka" \
clientSecret="xxx" \
scope="test_kafka_access";
# Additional consumer settings
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# SSL configurations
ssl.keystore.location=/Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password=changeit
ssl.truststore.location=/Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password=changeit
Код: Выделить всё
[2024-12-05 17:29:07,548] INFO AdminClientConfig values:
auto.include.jmx.reporter = true
bootstrap.controllers = []
bootstrap.servers = [localhost:9092]
client.dns.lookup = use_all_dns_ips
client.id = confluent-telemetry-reporter-local-producer
confluent.lkc.id = null
confluent.metrics.reporter.bootstrap.servers = kafka-0:9071
confluent.proxy.protocol.client.address = null
confluent.proxy.protocol.client.mode = PROXY
confluent.proxy.protocol.client.port = null
confluent.proxy.protocol.client.version = NONE
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
enable.metrics.push = true
host.resolver.class = class org.apache.kafka.clients.DefaultHostResolver
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.max.ms = 1000
retry.backoff.ms = 500
sasl.client.callback.handler.class = null
sasl.jaas.config = [hidden]
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.connect.timeout.ms = null
sasl.login.read.timeout.ms = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.login.retry.backoff.max.ms = 10000
sasl.login.retry.backoff.ms = 100
sasl.mechanism = OAUTHBEARER
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = [account]
sasl.oauthbearer.expected.issuer = https://localhost:8443/realms/myorg
sasl.oauthbearer.jwks.endpoint.refresh.ms = 3600000
sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms = 10000
sasl.oauthbearer.jwks.endpoint.retry.backoff.ms = 100
sasl.oauthbearer.jwks.endpoint.url = https://localhost:8443/realms/myorg/protocol/openid-connect/certs
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = SASL_SSL
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 30000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = /Users/sam/driveD/keystoret-truststore/keystore.jks
ssl.keystore.password = [hidden]
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = /Users/sam/driveD/keystoret-truststore/truststore.jks
ssl.truststore.password = [hidden]
ssl.truststore.type = JKS
(org.apache.kafka.clients.admin.AdminClientConfig)
[2024-12-05 17:29:07,582] INFO These configurations '[compression.type, enable.idempotence, acks, sasl.oauthbearer.expected.issuer, sasl.oauthbearer.expected.audience, key.serializer, max.request.size, value.serializer, interceptor.classes, max.in.flight.requests.per.connection, sasl.oauthbearer.jwks.endpoint.url, linger.ms]' were supplied but are not used yet. (org.apache.kafka.clients.admin.AdminClientConfig)
[2024-12-05 17:29:07,582] INFO Kafka version: 7.7.1-ce (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,582] INFO Kafka commitId: e42eb4aa8aba3e1cdae0b5edc71b43747ca85a6d (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,582] INFO Kafka startTimeMs: 1733399947582 (org.apache.kafka.common.utils.AppInfoParser)
[2024-12-05 17:29:07,595] INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, exp=1.733403487269E9, iat=1.733399887269E9} (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
[2024-12-05 17:29:07,611] INFO Successfully validated token with principal thePrincipalName: {sub=thePrincipalName, exp=1.733403487269E9, iat=1.733399887269E9} (org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerUnsecuredValidatorCallbackHandler)
[2024-12-05 17:29:07,615] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='_confluent-telemetry-metrics', numPartitions=12, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='max.message.bytes', value='10485760'), CreateableTopicConfig(name='message.timestamp.type', value='CreateTime'), CreateableTopicConfig(name='min.insync.replicas', value='1'), CreateableTopicConfig(name='retention.ms', value='259200000'), CreateableTopicConfig(name='segment.ms', value='14400000'), CreateableTopicConfig(name='retention.bytes', value='-1')], linkName=null, mirrorTopic=null, sourceTopicId=AAAAAAAAAAAAAAAAAAAAAA, mirrorStartOffsetSpec=-9223372036854775808, mirrorStartOffsets=[]) (kafka.server.ZkAdminManager)
[2024-12-05 17:28:24,577] INFO [Admin Manager on Broker 0]: Error processing create topic request CreatableTopic(name='_confluent-command', numPartitions=1, replicationFactor=3, assignments=[], configs=[CreateableTopicConfig(name='cleanup.policy', value='compact'), CreateableTopicConfig(name='min.insync.replicas', value='2')], linkName=null, mirrorTopic=null, sourceTopicId=AAAAAAAAAAAAAAAAAAAAAA, mirrorStartOffsetSpec=-9223372036854775808, mirrorStartOffsets=[]) (kafka.server.ZkAdminManager)
org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 3 larger than available brokers: 1.
[img]https://i .sstatic.net/XdVaztcg.png[/img]
Я не уверен, использует ли он oauth или нет?
**Как правильно настроить Kafka и какое использование OAUTH 2grant_type=client_credential ?
**
Подробнее здесь: https://stackoverflow.com/questions/792 ... credential
Мобильная версия