Исключение Kafka — org.apache.kafka.common.errors.NotLeaderOrFollowerExceptionJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Гость
 Исключение Kafka — org.apache.kafka.common.errors.NotLeaderOrFollowerException

Сообщение Гость »

Я развертываю Kafka внутри кластера Kubernetes в режиме высокой доступности (несколько брокеров). Развертывание состоит из
  • Kubernetes
  • Kafka 3.6.1
Обратитесь к следующим файлам, использованным при развертывании:
Dockerfile

Код: Выделить всё

FROM eclipse-temurin:17.0.9_9-jdk-jammy

ENV KAFKA_VERSION=3.6.1
ENV SCALA_VERSION=2.13
ENV KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin

LABEL name="kafka" version=${KAFKA_VERSION}

RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& tar xfz /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /opt \
&& rm /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz \
&& ln -s /opt/kafka_${SCALA_VERSION}-${KAFKA_VERSION} ${KAFKA_HOME} \
&& rm -rf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz

COPY ./entrypoint.sh /
RUN ["chmod", "+x", "/entrypoint.sh"]
ENTRYPOINT ["/entrypoint.sh"]
entrypoint.sh

Код: Выделить всё

#!/bin/bash

NODE_ID=${HOSTNAME:6}
LISTENERS="SASL://:9092,CONTROLLER://:9093,INTERNAL://:29092"

ADVERTISED_LISTENERS="SASL://kraft-$NODE_ID:9092,INTERNAL://kafka-$NODE_ID.$SERVICE.$NAMESPACE.svc.cluster.local:29092"

CONTROLLER_QUORUM_VOTERS=""
for i in $( seq 0 $REPLICAS); do
if [[ $i != $REPLICAS ]]; then
CONTROLLER_QUORUM_VOTERS="$CONTROLLER_QUORUM_VOTERS$i@kafka-$i.$SERVICE.$NAMESPACE.svc.cluster.local:9093,"
else
CONTROLLER_QUORUM_VOTERS=${CONTROLLER_QUORUM_VOTERS::-1}
fi
done

mkdir -p $SHARE_DIR/$NODE_ID

if [[ ! -f "$SHARE_DIR/cluster_id" && "$NODE_ID" = "0" ]]; then
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo $CLUSTER_ID > $SHARE_DIR/cluster_id
else
CLUSTER_ID=$(cat $SHARE_DIR/cluster_id)
fi

sed -e "s+^node.id=.*+node.id=$NODE_ID+" \
-e "s+^controller.quorum.voters=.*+controller.quorum.voters=$CONTROLLER_QUORUM_VOTERS+" \
-e "s+^listeners=.*+listeners=$LISTENERS+" \
-e "s+^advertised.listeners=.*+advertised.listeners=$ADVERTISED_LISTENERS+" \
-e "s+^log.dirs=.*+log.dirs=$SHARE_DIR/$NODE_ID+" \
/opt/kafka/config/kraft/server.properties > server.properties.updated \
&& mv server.properties.updated /opt/kafka/config/kraft/server.properties

JAAS="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\";"

echo -e "\nlistener.name.sasl.plain.sasl.jaas.config=${JAAS}" >> /opt/kafka/config/kraft/server.properties
echo -e "\nsasl.enabled.mechanisms=PLAIN" >> /opt/kafka/config/kraft/server.properties
echo -e "\nlistener.security.protocol.map=SASL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT" >> /opt/kafka/config/kraft/server.properties
echo -e "\ninter.broker.listener.name=INTERNAL" >>  /opt/kafka/config/kraft/server.properties

kafka-storage.sh format -t $CLUSTER_ID -c /opt/kafka/config/kraft/server.properties

exec kafka-server-start.sh /opt/kafka/config/kraft/server.properties
kafka.yaml

Код: Выделить всё

apiVersion: v1
kind: Namespace
metadata:
name: kafka-kraft
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: kafka-pv-volume
labels:
type: local
spec:
storageClassName: manual
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: '/mnt/data'
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: kafka-pv-claim
namespace: kafka-kraft
spec:
storageClassName: manual
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 3Gi
---
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
labels:
app: kafka-app
namespace: kafka-kraft
spec:
clusterIP: None
ports:
- name: '9092'
port: 9092
protocol: TCP
targetPort: 9092
selector:
app: kafka-app
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: kafka
labels:
app: kafka-app
namespace: kafka-kraft
spec:
serviceName: kafka-svc
replicas: 5
selector:
matchLabels:
app: kafka-app
template:
metadata:
labels:
app: kafka-app
spec:
volumes:
- name: kafka-storage
persistentVolumeClaim:
claimName: kafka-pv-claim
containers:
- name: kafka-container
image: myimage/kafka-kraft:1.0
ports:
- containerPort: 9092
- containerPort: 9093
env:
- name: REPLICAS
value: '5'
- name: SERVICE
value: kafka-svc
- name: NAMESPACE
value: kafka-kraft
- name: SHARE_DIR
value: /mnt/kafka
volumeMounts:
- name: kafka-storage
mountPath: /mnt/kafka
После развертывания все контейнеры запускаются и работают. Затем я подключаю брокера, используя следующую команду

Код: Выделить всё

.\kafka-topics.bat --bootstrap-server kraft-0:9092,kraft-1:9092,kraft-2:9092,kraft-3:9092,kraft-4:9092 --command-config producer.properties --topic hello --create --replication-factor 5
producer.properties

Код: Выделить всё

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=admin password=admin-secret;
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
metadata.max.age.ms=1000
Отображается приглашение ввести сообщение. В образце текста выдается следующая ошибка.

Код: Выделить всё

[Producer clientId=console-producer] Received invalid metadata error in produce request on partition hello2-1 due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests intended only for the leader, this error indicates that the broker is not the current leader. For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.. Going to request metadata update now
Что я уже пробовал
  • Пробовал Zookeeper и крафт-режим
    Попробовал удалить и воссоздать темы (работает случайным образом)
К сожалению, проблема не устранена и сообщения не создаются.

Подробнее здесь: https://stackoverflow.com/questions/778 ... rexception
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»