Я столкнулся с странной проблемой с моим продюсером Кафки. Я использую версию сервера /клиента Kafka-0.11.
У меня есть один зооукепер и один брокерский узел Kafka. Кроме того, я создал тему «События» с 3 разделами: < /p>
Topic:events PartitionCount:3 ReplicationFactor:1 Configs:
Topic: events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: events Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: events Partition: 2 Leader: 0 Replicas: 0 Isr: 0
< /code>
В моем коде Java я создаю производителя со следующими свойствами: < /p>
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(MAX_BLOCK_MS_CONFIG, 30000);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(PARTITIONER_CLASS_CONFIG, KafkaCustomPartitioner.class);
this.producer = new KafkaProducer(props);
< /code>
Кроме того, я добавил обратный вызов к методу продюсера#send (), который добавляет неудачное сообщение в очередь, которая итератируется другим поток «повторное распределение» в цикле: < /p>
this.producer.send(producerRecord, new ProducerCallback(producerRecord.value(), topic));
private class ProducerCallback implements Callback {
private final String message;
private final String topic;
public ProducerCallback(String message, String topic) {
this.message = message;
this.topic = topic;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
if (ex != null) {
logger.error("Kafka producer error. Topic: " + topic +
".Message will be added into failed messages queue.", ex);
failedMessagesQueue.enqueue(SerializationUtils.serialize(new FailedMessage(topic, message)));
}
}
}
private class ResenderThread extends Thread {
private volatile boolean running = true;
public void stopGracefully() {
running = false;
}
@Override
public void run() {
while (running) {
try {
byte[] val = failedMessagesQueue.peek();
if (val != null) {
FailedMessage failedMessage = SerializationUtils.deserialize(val);
ProducerRecord record;
if (topic.equals(failedMessage.getTopic())) {
String messageKey = generateMessageKey(failedMessage.getMessage());
record = createProducerRecordWithKey(failedMessage.getMessage(), messageKey, failedMessage.getTopic());
} else {
record = new ProducerRecord(failedMessage.getTopic(), failedMessage.getMessage());
}
try {
this.producer.send(record).get();
failedMessagesQueue.dequeue();
} catch (Exception e) {
logger.debug("Kafka message resending attempt was failed. Topic " + failedMessage.getTopic() +
" Partition. " + record.partition() + ". " + e.getMessage());
}
}
Thread.sleep(200);
} catch (Exception e) {
logger.error("Error resending an event", e);
break;
}
}
}
}
< /code>
Все работает нормально, пока я не решил протестировать сценарий брокера Kafka Kill /re-start: < /p>
Я убил свой брокерский узел Kafka и отправил 5 сообщений, используя мой производитель Kafka. Следующее сообщение было зарегистрировано моим приложением продюсера: < /p>
....the application works fine....
// kafka broker was killed
2017-11-10 09:20:44,594 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,646 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,700 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,759 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,802 WARN [org.apache.kafka.clients.NetworkClient] -
// sent 5 message using producer. message were put to the failedMessagesQueue and "re-sender" thread started resending
2017-11-10 09:20:44,905 ERROR [com.inq.kafka.KafkaETLService] -
....
2017-11-10 09:20:45,070 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:45,129 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:45,170 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:45,217 WARN [org.apache.kafka.clients.NetworkClient] -
// kafka broker was restarted, some strange errors were logged
2017-11-10 09:20:51,103 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:51,205 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:51,308 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:51,114 WARN [org.apache.kafka.clients.producer.internals.Sender] -
2017-11-10 09:20:51,114 ERROR [com.inq.kafka.KafkaETLService] -
2017-11-10 09:20:52,485 WARN [org.apache.kafka.clients.NetworkClient] -
// messages were succesfully re-sent and received by consumer..
< /code>
Как я могу избавиться от этих журналов (которые регистрируются каждые 100 мс, когда кафка брокер не работает): < /p>
[org.apache.kafka.clients.NetworkClient] -
< /code>
Почему я получаю следующие ошибки после стартапа брокера Kafka (я не изменил реквизита сервера и не изменял тему). Мне кажется, что эти ошибки являются результатом некоторого процесса синкрунизации между Zookeeper и Kafka во время стартапа брокера, потому что через некоторое время закудчик успешно возмущается моими сообщениями. Я ошибаюсь ?: < /p>
[org.apache.kafka.clients.NetworkClient] -
Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it.
Подробнее здесь: https://stackoverflow.com/questions/472 ... -exception
Кафка брокер может быть недоступным исключением ⇐ JAVA
Программисты JAVA общаются здесь
-
Anonymous
1758099945
Anonymous
Я столкнулся с странной проблемой с моим продюсером Кафки. Я использую версию сервера /клиента Kafka-0.11.
У меня есть один зооукепер и один брокерский узел Kafka. Кроме того, я создал тему «События» с 3 разделами: < /p>
Topic:events PartitionCount:3 ReplicationFactor:1 Configs:
Topic: events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: events Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: events Partition: 2 Leader: 0 Replicas: 0 Isr: 0
< /code>
В моем коде Java я создаю производителя со следующими свойствами: < /p>
Properties props = new Properties();
props.put(BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(MAX_BLOCK_MS_CONFIG, 30000);
props.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(PARTITIONER_CLASS_CONFIG, KafkaCustomPartitioner.class);
this.producer = new KafkaProducer(props);
< /code>
Кроме того, я добавил обратный вызов к методу продюсера#send (), который добавляет неудачное сообщение в очередь, которая итератируется другим поток «повторное распределение» в цикле: < /p>
this.producer.send(producerRecord, new ProducerCallback(producerRecord.value(), topic));
private class ProducerCallback implements Callback {
private final String message;
private final String topic;
public ProducerCallback(String message, String topic) {
this.message = message;
this.topic = topic;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception ex) {
if (ex != null) {
logger.error("Kafka producer error. Topic: " + topic +
".Message will be added into failed messages queue.", ex);
failedMessagesQueue.enqueue(SerializationUtils.serialize(new FailedMessage(topic, message)));
}
}
}
private class ResenderThread extends Thread {
private volatile boolean running = true;
public void stopGracefully() {
running = false;
}
@Override
public void run() {
while (running) {
try {
byte[] val = failedMessagesQueue.peek();
if (val != null) {
FailedMessage failedMessage = SerializationUtils.deserialize(val);
ProducerRecord record;
if (topic.equals(failedMessage.getTopic())) {
String messageKey = generateMessageKey(failedMessage.getMessage());
record = createProducerRecordWithKey(failedMessage.getMessage(), messageKey, failedMessage.getTopic());
} else {
record = new ProducerRecord(failedMessage.getTopic(), failedMessage.getMessage());
}
try {
this.producer.send(record).get();
failedMessagesQueue.dequeue();
} catch (Exception e) {
logger.debug("Kafka message resending attempt was failed. Topic " + failedMessage.getTopic() +
" Partition. " + record.partition() + ". " + e.getMessage());
}
}
Thread.sleep(200);
} catch (Exception e) {
logger.error("Error resending an event", e);
break;
}
}
}
}
< /code>
Все работает нормально, пока я не решил протестировать сценарий брокера Kafka Kill /re-start: < /p>
Я убил свой брокерский узел Kafka и отправил 5 сообщений, используя мой производитель Kafka. Следующее сообщение было зарегистрировано моим приложением продюсера: < /p>
....the application works fine....
// kafka broker was killed
2017-11-10 09:20:44,594 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,646 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,700 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,759 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:44,802 WARN [org.apache.kafka.clients.NetworkClient] -
// sent 5 message using producer. message were put to the failedMessagesQueue and "re-sender" thread started resending
2017-11-10 09:20:44,905 ERROR [com.inq.kafka.KafkaETLService] -
....
2017-11-10 09:20:45,070 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:45,129 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:45,170 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:45,217 WARN [org.apache.kafka.clients.NetworkClient] -
// kafka broker was restarted, some strange errors were logged
2017-11-10 09:20:51,103 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:51,205 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:51,308 WARN [org.apache.kafka.clients.NetworkClient] -
2017-11-10 09:20:51,114 WARN [org.apache.kafka.clients.producer.internals.Sender] -
2017-11-10 09:20:51,114 ERROR [com.inq.kafka.KafkaETLService] -
2017-11-10 09:20:52,485 WARN [org.apache.kafka.clients.NetworkClient] -
// messages were succesfully re-sent and received by consumer..
< /code>
Как я могу избавиться от этих журналов (которые регистрируются каждые 100 мс, когда кафка брокер не работает): < /p>
[org.apache.kafka.clients.NetworkClient] -
< /code>
Почему я получаю следующие ошибки после стартапа брокера Kafka (я не изменил реквизита сервера и не изменял тему). Мне кажется, что эти ошибки являются результатом некоторого процесса синкрунизации между Zookeeper и Kafka во время стартапа брокера, потому что через некоторое время закудчик успешно возмущается моими сообщениями. Я ошибаюсь ?: < /p>
[org.apache.kafka.clients.NetworkClient] -
Received unknown topic or partition error in produce request on partition events-0. The topic/partition may not exist or the user may not have Describe access to it.
Подробнее здесь: [url]https://stackoverflow.com/questions/47231775/kafka-broker-may-not-be-available-exception[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия