Правильная ли это конфигурация Kafka Consumer Config — при этой настройке Kafka?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Правильная ли это конфигурация Kafka Consumer Config — при этой настройке Kafka?

Сообщение Anonymous »

Мой Kafka Producer создает сообщения со скоростью около .. 350 МБ за 30 секунд..

Код: Выделить всё

Kafka Setup
:

--> 1 экземпляр Zookeeper

--> 3 Kafka Брокеры

--> 1 Производитель Java

--> 1 Java Потребитель

Вот как я создал разделы тем и брокеров:

Код: Выделить всё

bin/kafka-topics.sh --create --zookeeper 10.10.1.5:2181 --replication-factor 1 --partitions 8 --topic test
Остальная конфигурация следующая.

Код: Выделить всё

Producer Code
:

Код: Выделить всё

KeyedMessage publishData = new KeyedMessage(this.topic, data);
producer.send(publishData);
Здесь данные имеют длину 5000 байтов[].

Код: Выделить всё

Producer Config
:

Код: Выделить всё

batch.size = 200
producer.type = async
sflow-topic = test
connect.timeout.ms = 10000
request.required.acks = 0
zk.connect = 10.10.1.5:2181
serializer.class = kafka.serializer.DefaultEncoder
partitioner.class = kafka.producer.DefaultPartitioner
metadata.broker.list = 10.10.1.5:9092,10.10.1.6:9092,10.10.1.7:9092
Я вижу, что мой производитель работает нормально. Проблема в том, что потребитель потребляет сообщения. Даже если потребитель отстает, я не вижу, чтобы мои сообщения потреблялись. (и, в конечном итоге, обрабатывается и вставляется в БД) в одном и том же/равном темпе. Кроме того, я провел несколько тестов на потребителе, где обнаружил, что не все сообщения потребляются моим потребителем. Не знаю, почему :(< /p>

Код: Выделить всё

Consumer Code
:

Код: Выделить всё

 public class FlowConsumer {
private final String topic;
private final ExecutorService threadPool;
private final ConsumerConnector consumer;
private static AppProperties appProperties;
private final ExecutorService processDataThreadPool;

public FlowConsumer() throws Exception {
/**
* Load properties configuration for flowLog4j.properties.
*/
appProperties = AppProperties.loadConfiguration();

/** Assign the flow-topic.. */
this.topic = appProperties.getString(AppConstants.FLOW_TOPIC);
logger.fatal("Topic : "+topic);

/** Initialize the thread pool to consume kafka byte[] streams.. */
this.threadPool = Executors.newFixedThreadPool(20);

/** Initialize the thread pool for processing kafka byte[] messages.. */
this.processDataThreadPool = Executors.newFixedThreadPool(100);

/** Fetch the Consumer Config, by reading the Flow.properties file..  */
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(ConsumerPropertyConfig.getConsumerConfig(appProperties));

logger.fatal("Consumer : "+consumer);

//new Thread(new Consumer()).start();
threadPool.submit(new Consumer());
}

public void shutdown() {
if (consumer != null) consumer.shutdown();
if (threadPool != null) threadPool.shutdown();
if (processDataThreadPool!= null) processDataThreadPool.shutdown();
}

private class Consumer implements Runnable {

public Consumer() {
logger.fatal("Started Consumer Thread!");
}

@Override
public void run() {
Map topicCountMap = new HashMap();
Map consumerMap = consumer.createMessageStreams(topicCountMap);
List streams = consumerMap.get(topic);
for (final KafkaStream kafkaStream : streams) {
for (MessageAndMetadata messageAndMetadata : kafkaStream) {
processDataThreadPool.submit(new FlowServiceImpl(messageAndMetadata.message()));
}
}
}
}

public static void main(String[] args) throws Exception {
FlowConsumer consumer = new FlowConsumer();

/*try {
Thread.sleep(10000);
} catch (InterruptedException ie) {

}
consumer.shutdown();*/
}
}

Код: Выделить всё

Consumer Config
:

Код: Выделить всё

group.id = group1
flow-topic = test
auto.offset.reset = smallest
auto.commit.interval.ms = 2000
zookeeper.connect = 10.10.1.5:2181
zookeeper.sync.time.ms = 2000
zookeeper.session.timeout.ms = 2000
zookeeper.connection.timeout.ms = 6000

Код: Выделить всё

Question 1
:

Для 3 брокеров, могу ли я/должен ли я создать более 3 разделов? Я читал, что большее количество разделов означает, что я могу добавить больше параллелизма своему потребителю? Но как, используя больше потребительских потоков для одного потребителя? Или имея 3 экземпляра потребителя, каждый из которых имеет по 1 потоку?

Код: Выделить всё

Question 2
:

Правильен/неверен ли мой код пользовательской конфигурации Java?

Может кто-нибудь сказать мне, что я здесь делаю не так?

Подробнее здесь: https://stackoverflow.com/questions/231 ... afka-setup
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Правильная ли это конфигурация Kafka Consumer Config — при этой настройке Kafka?
    Anonymous » » в форуме JAVA
    0 Ответы
    42 Просмотры
    Последнее сообщение Anonymous
  • Это правильная конфигурация Cafka Consumer - под этой настройкой Kafka?
    Anonymous » » в форуме JAVA
    0 Ответы
    8 Просмотры
    Последнее сообщение Anonymous
  • Это правильная конфигурация Cafka Consumer - под этой настройкой Kafka?
    Anonymous » » в форуме JAVA
    0 Ответы
    4 Просмотры
    Последнее сообщение Anonymous
  • Проблема при переназначении Kafka Consumer на другой раздел
    Anonymous » » в форуме Python
    0 Ответы
    56 Просмотры
    Последнее сообщение Anonymous
  • Zookeeper не является распознаваемой опцией при выполнении kafka-console-consumer.sh
    Anonymous » » в форуме JAVA
    0 Ответы
    43 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»