Проблемы с приемом данных веб-сокета с помощью Apache StormJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Проблемы с приемом данных веб-сокета с помощью Apache Storm

Сообщение Anonymous »

Я новичок в Apache Storm и пытаюсь использовать его для приема данных из веб-сокета и вывода их в журналы (а затем сохранения данных в Hive или обработки их с помощью Spark)
Я создал Java-приложение и сгенерировал из него файл jar, а затем загрузил файл jar в Apache Storm, используя синтаксис Storm jar .jar . У меня также есть несколько операторов logger.debug в моем Java-файле для целей отладки.
Затем я использую пользовательский интерфейс Storm, чтобы проверить, работает ли моя топология.Результаты:
  • Загруженная мной топология отображается в пользовательском интерфейсе Storm как АКТИВНАЯ, а носик и болт отображаются в списке. правильно
  • В пользовательском интерфейсе Storm указано, что носик не отправлял никаких сообщений, но через несколько минут использовал значительный объем оперативной памяти.
  • Ни один из моих операторов logger.debug() не отображается ни в одном из журналов.
    Я вижу следующее в журнале, но оно появляется только раз в 30 секунд (веб-сокет отправляет данные несколько раз в секунду)

Код: Выделить всё

[INFO] Processing received TUPLE: source: __system:-1, stream: __tick, id: {}, [30] PROC_START_TIME(sampled): null EXEC_START_TIME(sampled): null for TASK: -2
Это PrintBolt, который должен регистрировать сообщения:

Код: Выделить всё

public static class PrintBolt extends BaseRichBolt {
private static final Logger logger = LogManager.getLogger(PrintBolt.class);
OutputCollector collector;

@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
String message = tuple.getStringByField("message");
logger.debug("Received message: " + message);
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

}
а вот носик:

Код: Выделить всё

public static class WebSocketSpout extends BaseRichSpout {
private static final Logger logger = LogManager.getLogger(WebSocketSpout.class);
private SpoutOutputCollector collector;
private BlockingQueue queue = new LinkedBlockingQueue();
private WebSocketClient client;

@Override
public void nextTuple() {
String message = queue.poll();
if (message != null) {
collector.emit(new Values(message));
logger.error(message);
} else {
Utils.sleep(50);
}
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
client = new WebSocketClient(new URI("")) {
@Override
public void onOpen(ServerHandshake handshakedata) {
logger.debug("Connected to WebSocket");
}

@Override
public void onMessage(String message) {
queue.offer(message);
}

@Override
public void onClose(int code, String reason, boolean remote) {
logger.debug("Disconnected from WebSocket");
}

@Override
public void onError(Exception ex) {
ex.printStackTrace();
}
};
client.connect();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}

}
Как мне регистрировать сообщения из моей топологии и почему мой носик не отправляет никаких сообщений? Кроме того, почему я вижу сообщения в своих журналах каждые 30 секунд, если в соответствии с веб-интерфейсом ничего не отправляется?


Подробнее здесь: https://stackoverflow.com/questions/791 ... ache-storm
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»