Это правильная конфигурация Cafka Consumer - под этой настройкой Kafka?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Это правильная конфигурация Cafka Consumer - под этой настройкой Kafka?

Сообщение Anonymous »

My Kafka Producer производит сообщения со скоростью около .. 350 МБ на 30 секунд ..

Код: Выделить всё

Kafka Setup< /code>: < /p>

-> 1 экземпляр Zookeeper < /code> < /p>

-> 3 брокеры kafka < /code> < /p>

-> 1 Java производитель < /code> < /p>

-> 1 Java < /code> < /p>
-> 1 Java < /code> < /p>
-> 1 Java. Потребитель < /code> < /p>

вот как я создал тему и разделие брокера < /code>: < /p>

bin/kafka-topics.sh --create --zookeeper 10.10.1.5:2181 --replication-factor 1 --partitions 8 --topic test< /code> < /p>

Остальная часть конфигурации следующим образом .. < /p>

Producer Code< /code>: < /p>

KeyedMessage publishData = new KeyedMessage(this.topic, data);
producer.send(publishData);
Здесь, data - 5000 длина byte [] .

Код: Выделить всё

Producer Config< /code>: < /p>

batch.size = 200
producer.type = async
sflow-topic = test
connect.timeout.ms = 10000
request.required.acks = 0
zk.connect = 10.10.1.5:2181
serializer.class = kafka.serializer.DefaultEncoder
partitioner.class = kafka.producer.DefaultPartitioner
metadata.broker.list = 10.10.1.5:9092,10.10.1.6:9092,10.10.1.7:9092
< /code>

Я вижу, что мой производитель работает просто хорошо. Проблема заключается в том, что потребитель потребляет сообщения. Даже если потребитель отстает, я не вижу, чтобы мои сообщения использовали (и в конечном итоге обрабатывали, и вставлен в БД) в одном и том же /равных темпах .. Также я провел несколько тестов на потребителе, где я обнаружил, что не все сообщения поглощены моим потребителем. Не уверен, почему. /> Consumer Code< /code>: < /p>

 public class FlowConsumer {
private final String topic;
private final ExecutorService threadPool;
private final ConsumerConnector consumer;
private static AppProperties appProperties;
private final ExecutorService processDataThreadPool;

public FlowConsumer() throws Exception {
/**
* Load properties configuration for flowLog4j.properties.
*/
appProperties = AppProperties.loadConfiguration();

/** Assign the flow-topic.. */
this.topic = appProperties.getString(AppConstants.FLOW_TOPIC);
logger.fatal("Topic : "+topic);

/** Initialize the thread pool to consume kafka byte[] streams.. */
this.threadPool = Executors.newFixedThreadPool(20);

/** Initialize the thread pool for processing kafka byte[] messages.. */
this.processDataThreadPool = Executors.newFixedThreadPool(100);

/** Fetch the Consumer Config, by reading the Flow.properties file..  */
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerPropertyConfig.getConsumerConfig(appProperties));

logger.fatal("Consumer : "+consumer);

//new Thread(new Consumer()).start();
threadPool.submit(new Consumer());
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (threadPool != null) threadPool.shutdown();
if (processDataThreadPool!= null) processDataThreadPool.shutdown();
}

private class Consumer implements Runnable {

public Consumer() {
logger.fatal("Started Consumer Thread!");
}

@Override
public void run() {
Map topicCountMap = new HashMap();
Map consumerMap = consumer.createMessageStreams(topicCountMap);
List streams = consumerMap.get(topic);
for (final KafkaStream kafkaStream : streams) {
for (MessageAndMetadata messageAndMetadata : kafkaStream) {
processDataThreadPool.submit(new FlowServiceImpl(messageAndMetadata.message()));
}
}
}
}

public static void main(String[] args) throws Exception {
FlowConsumer consumer = new FlowConsumer();

/*try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
consumer.shutdown();*/
}
}
< /code>

Consumer Config< /code>: < /p>

group.id = group1
flow-topic = test
auto.offset.reset = smallest
auto.commit.interval.ms = 2000
zookeeper.connect = 10.10.1.5:2181
zookeeper.sync.time.ms = 2000
zookeeper.session.timeout.ms = 2000
zookeeper.connection.timeout.ms = 6000
< /code>

Question 1
:

Для 3 брокеров, могу/должен ли я создать более 3 разделов? Я читаю, что больше разделов означает, что я могу добавить больше параллелизма своему потребителю? Но как, используя больше потребительского потока на одного потребителя? Или имея 3 экземпляра потребителя, имея по 1 потоке каждый? < /P>

Код: Выделить всё

Question 2< /code>: < /p>

Is my Java consumer config code correct/wrong
? < /p>

может кто -нибудь скажите мне, что я здесь делаю не так? < /p>

Подробнее здесь: https://stackoverflow.com/questions/231 ... afka-setup
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»