Как получить все сообщения из темы Кафки и подсчитать их с помощью Java?JAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как получить все сообщения из темы Кафки и подсчитать их с помощью Java?

Сообщение Anonymous »

Этот код иногда выдает мне все сообщения с самого начала и ждет следующего сообщения, а иногда просто ждет еще одного сообщения

Код: Выделить всё

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

public class TestConsumer{

public static void main(String[] args) {
ConsumerConfig config;
Properties props = new Properties();
props.put("zookeeper.connect","sandbox.hortonworks.com:2181");
props.put("group.id", "group-4");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "200");
config = new ConsumerConfig(props);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector
(config);
String topic = "News";
System.out.println("Running");
Run(consumer,topic);
}

public static void Run(ConsumerConnector consumer,String topic){
HashMap topicCountMap =
new HashMap();
topicCountMap.put(topic, 1);
Map
consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream stream = consumerMap.get(topic).get(0);
ConsumerIterator it =  stream.iterator();
List msgTopicList = new ArrayList();
int count = 0;
System.out.println("Waiting");
while(it.hasNext()){
MessageAndMetadata msgAndData = it.next();
String msg = new String(msgAndData.message());
msgTopicList.add(msg);
String key = "NoKey";
System.out.println(msg);
count++;
}
}
}
Что мне нужно сделать, так это получить все сообщения из темы, отправленные пользователю, и подсчитать их

Как лучше всего это сделать?

версия kafka_2.10-0.8.1.2.2.4.2-2

Подробнее здесь: https://stackoverflow.com/questions/371 ... using-java
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Как получить все сообщения из темы Кафки и подсчитать их с помощью Java?
    Anonymous » » в форуме JAVA
    0 Ответы
    19 Просмотры
    Последнее сообщение Anonymous
  • Как я могу прочитать всегда новейшее (последнее) сообщение с темы Кафки, в которой есть более одного раздела (в моем слу
    Anonymous » » в форуме C#
    0 Ответы
    3 Просмотры
    Последнее сообщение Anonymous
  • Создание темы кафки не удалось
    Anonymous » » в форуме JAVA
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Создание темы кафки не удалось
    Anonymous » » в форуме JAVA
    0 Ответы
    5 Просмотры
    Последнее сообщение Anonymous
  • Создание темы кафки не удалось
    Anonymous » » в форуме JAVA
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous

Вернуться в «JAVA»