Kafka не удалось обновить метаданныеJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Kafka не удалось обновить метаданные

Сообщение Anonymous »

Я использую Kafka v0.10.1.1 с Spring-boot.

Я пытаюсь создать сообщение в теме Kafka mobile-user, используя приведенный ниже код производителя:

Тема mobile-user имеет 5 разделов и 2 фактора репликации. Я прикрепил свои настройки Kafka в конце вопроса.

Код: Выделить всё

package com.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

import com.shephertz.karma.constant.Constants;
import com.shephertz.karma.exception.KarmaException;
import com.shephertz.karma.util.Utils;

/**
* @author Prakash Pandey
*/
@Service
public class NotificationSender {

@Autowired
private KafkaTemplate kafkaTemplate;

private static Logger LOGGER = LoggerFactory.getLogger(NotificationSender.class);

// Send Message
public void sendMessage(String topicName, String message) throws KarmaException {
LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
ListenableFuture result = kafkaTemplate.send(topicName, message);
result.addCallback(new ListenableFutureCallback() {
@Override
public void onSuccess(SendResult result) {
LOGGER.info("sent message='{}'" + " to partition={}" + " with offset={}", message,
result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + Utils.getStackTrace(ex));

}
});

LOGGER.debug("Payload sent to kafka");
LOGGER.debug("topic: " + topicName + ", payload: " + message);
}
}
Проблема:

Мне удалось отправить сообщение в Kafka, но иногда я получаю эту ошибку:

Код: Выделить всё

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
2017-10-25 06:21:48, [ERROR] [karma-unified-notification-dispatcher - NotificationDispatcherSender - onFailure:43] Exception in sending message to kafka for queryorg.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
at org.springframework.kafka.core.KafkaTemplate$1.onCompletion(KafkaTemplate.java:255)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:486)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:436)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:156)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:241)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:151)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.
Свойства Kafka:

Код: Выделить всё

spring.kafka.producer.retries=5
spring.kafka.producer.batch-size=1000
spring.kafka.producer.request.timeout.ms=60000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=1
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.topic.retention=86400000

spring.zookeeper.hosts=192.20.1.19:2181,10.20.1.20:2181,10.20.1.26:2181
spring.kafka.session.timeout=30000
spring.kafka.connection.timeout=10000
spring.kafka.topic.partition=5
spring.kafka.message.replication=2

spring.kafka.listener.concurrency=1
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.max-poll-records=200
spring.kafka.consumer.max-poll-interval-ms=300000
Было бы очень полезно, если бы вы помогли мне решить эту проблему.
Спасибо.

Обратите внимание: я не получаю это сообщение выше каждый раз. Я успешно могу создать сообщение в kafka-topic и успешно использовать его потребителю. Эта ошибка возникает примерно после 1000 успешно созданных сообщений.

Подробнее здесь: https://stackoverflow.com/questions/469 ... e-metadata
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»