Я создал Java-приложение и сгенерировал из него файл jar, а затем загрузил файл jar в Apache Storm, используя синтаксис Storm jar .jar . У меня также есть несколько операторов logger.debug в моем Java-файле для целей отладки.
Затем я использую пользовательский интерфейс Storm, чтобы проверить, работает ли моя топология.Результаты:
- Загруженная мной топология отображается в пользовательском интерфейсе Storm как АКТИВНАЯ, а носик и болт отображаются в списке. правильно
- В пользовательском интерфейсе Storm указано, что носик не отправлял никаких сообщений, но через несколько минут использовал значительный объем оперативной памяти.
- Ни один из моих операторов logger.debug() не отображается ни в одном из журналов.
Я вижу следующее в журнале, но оно появляется только раз в 30 секунд (веб-сокет отправляет данные несколько раз в секунду)
Код: Выделить всё
[INFO] Processing received TUPLE: source: __system:-1, stream: __tick, id: {}, [30] PROC_START_TIME(sampled): null EXEC_START_TIME(sampled): null for TASK: -2
Код: Выделить всё
public static class PrintBolt extends BaseRichBolt {
private static final Logger logger = LogManager.getLogger(PrintBolt.class);
OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String message = tuple.getStringByField("message");
logger.debug("Received message: " + message);
collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
Код: Выделить всё
public static class WebSocketSpout extends BaseRichSpout {
private static final Logger logger = LogManager.getLogger(WebSocketSpout.class);
private SpoutOutputCollector collector;
private BlockingQueue queue = new LinkedBlockingQueue();
private WebSocketClient client;
@Override
public void nextTuple() {
String message = queue.poll();
if (message != null) {
collector.emit(new Values(message));
logger.error(message);
} else {
Utils.sleep(50);
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
client = new WebSocketClient(new URI("")) {
@Override
public void onOpen(ServerHandshake handshakedata) {
logger.debug("Connected to WebSocket");
}
@Override
public void onMessage(String message) {
queue.offer(message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
logger.debug("Disconnected from WebSocket");
}
@Override
public void onError(Exception ex) {
ex.printStackTrace();
}
};
client.connect();
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
}
Подробнее здесь: https://stackoverflow.com/questions/791 ... ache-storm
Мобильная версия