- Kubernetes
- Kafka 3.6.1
Dockerfile
Код: Выделить всё
FROM eclipse-temurin:17.0.9_9-jdk-jammy
ENV KAFKA_VERSION=3.6.1
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin
LABEL name="kafka" version=${KAFKA_VERSION}
RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
&& rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
&& rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz
COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"]
Код: Выделить всё
#!/bin/bash
NODE_ID=${HOSTNAME:6}
LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"
ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"
CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
if [[ $i != $REPLICAS ]]; then
CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
else
CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
fi
done
mkdir -p $SHARE_DIR/$NODE_ID
if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi
sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties
JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"
echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> /opt/kafka/config/kraft/server.properties
echo -e "\nsasl.enabled.mechanisms=PLAIN" >> /opt/kafka/config/kraft/server.properties
echo -e "\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /opt/kafka/config/kraft/server.properties
echo -e "\ninter.broker.listener.name=INTERNAL" >> /opt/kafka/config/kraft/server.properties
kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties
exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties
Код: Выделить всё
apiVersion: v1
kind: Namespace
metadata:
name: kafka-kraft
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: kafka-pv-volume
labels:
type: local
spec:
storageClassName: manual
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-pv-claim
namespace: kafka-kraft
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
namespace: kafka-kraft
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
namespace: kafka-kraft
spec:
serviceName: kafka-svc
replicas: 5
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
volumes:
- name: kafka-storage
persistentVolumeClaim:
claimName: kafka-pv-claim
containers:
- name: kafka-container
image: myimage/kafka-kraft:1.0
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '5'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: kafka-kraft
- name: SHARE_DIR
value: /mnt/kafka
volumeMounts:
- name: kafka-storage
mountPath: /mnt/kafka
Код: Выделить всё
.\kafka-topics.bat --bootstrap-server kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092 --command-config producer.properties --topic hello --create --replication-factor 5
Код: Выделить всё
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
metadata.max.age.ms=1000
Код: Выделить всё
[Producer clientId=console-producer] Received invalid metadata error in produce request on partition hello2-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
- Пробовал Zookeeper и крафт-режим
Попробовал удалить и воссоздать темы (работает случайным образом)
Подробнее здесь: https://stackoverflow.com/questions/778 ... rexception