Код: Выделить всё
Kafka Setup< /code>: < /p>
-> 1 экземпляр Zookeeper < /code> < /p>
-> 3 брокеры kafka < /code> < /p>
-> 1 Java производитель < /code> < /p>
-> 1 Java < /code> < /p>
-> 1 Java < /code> < /p>
-> 1 Java. Потребитель < /code> < /p>
вот как я создал тему и разделие брокера < /code>: < /p>
bin/kafka-topics.sh --create --zookeeper 10.10.1.5:2181 --replication-factor 1 --partitions 8 --topic test< /code> < /p>
Остальная часть конфигурации следующим образом .. < /p>
Producer Code< /code>: < /p>
KeyedMessage publishData = new KeyedMessage(this.topic, data);
producer.send(publishData);
Код: Выделить всё
Producer Config< /code>: < /p>
batch.size = 200
producer.type = async
sflow-topic = test
connect.timeout.ms = 10000
request.required.acks = 0
zk.connect = 10.10.1.5:2181
serializer.class = kafka.serializer.DefaultEncoder
partitioner.class = kafka.producer.DefaultPartitioner
metadata.broker.list = 10.10.1.5:9092,10.10.1.6:9092,10.10.1.7:9092
< /code>
Я вижу, что мой производитель работает просто хорошо. Проблема заключается в том, что потребитель потребляет сообщения. Даже если потребитель отстает, я не вижу, чтобы мои сообщения использовали (и в конечном итоге обрабатывали, и вставлен в БД) в одном и том же /равных темпах .. Также я провел несколько тестов на потребителе, где я обнаружил, что не все сообщения поглощены моим потребителем. Не уверен, почему. /> Consumer Code< /code>: < /p>
public class FlowConsumer {
private final String topic;
private final ExecutorService threadPool;
private final ConsumerConnector consumer;
private static AppProperties appProperties;
private final ExecutorService processDataThreadPool;
public FlowConsumer() throws Exception {
/**
* Load properties configuration for flowLog4j.properties.
*/
appProperties = AppProperties.loadConfiguration();
/** Assign the flow-topic.. */
this.topic = appProperties.getString(AppConstants.FLOW_TOPIC);
logger.fatal("Topic : "+topic);
/** Initialize the thread pool to consume kafka byte[] streams.. */
this.threadPool = Executors.newFixedThreadPool(20);
/** Initialize the thread pool for processing kafka byte[] messages.. */
this.processDataThreadPool = Executors.newFixedThreadPool(100);
/** Fetch the Consumer Config, by reading the Flow.properties file.. */
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerPropertyConfig.getConsumerConfig(appProperties));
logger.fatal("Consumer : "+consumer);
//new Thread(new Consumer()).start();
threadPool.submit(new Consumer());
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (threadPool != null) threadPool.shutdown();
if (processDataThreadPool!= null) processDataThreadPool.shutdown();
}
private class Consumer implements Runnable {
public Consumer() {
logger.fatal("Started Consumer Thread!");
}
@Override
public void run() {
Map topicCountMap = new HashMap();
Map consumerMap = consumer.createMessageStreams(topicCountMap);
List streams = consumerMap.get(topic);
for (final KafkaStream kafkaStream : streams) {
for (MessageAndMetadata messageAndMetadata : kafkaStream) {
processDataThreadPool.submit(new FlowServiceImpl(messageAndMetadata.message()));
}
}
}
}
public static void main(String[] args) throws Exception {
FlowConsumer consumer = new FlowConsumer();
/*try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
consumer.shutdown();*/
}
}
< /code>
Consumer Config< /code>: < /p>
group.id = group1
flow-topic = test
auto.offset.reset = smallest
auto.commit.interval.ms = 2000
zookeeper.connect = 10.10.1.5:2181
zookeeper.sync.time.ms = 2000
zookeeper.session.timeout.ms = 2000
zookeeper.connection.timeout.ms = 6000
< /code>
Question 1
Для 3 брокеров, могу/должен ли я создать более 3 разделов? Я читаю, что больше разделов означает, что я могу добавить больше параллелизма своему потребителю? Но как, используя больше потребительского потока на одного потребителя? Или имея 3 экземпляра потребителя, имея по 1 потоке каждый? < /P>
Код: Выделить всё
Question 2< /code>: < /p>
Is my Java consumer config code correct/wrong
может кто -нибудь скажите мне, что я здесь делаю не так? < /p>
Подробнее здесь: https://stackoverflow.com/questions/231 ... afka-setup