Я хочу получить следующие три временные метки.
Время события: Момент времени, когда произошло событие или запись данных, т. е. было первоначально создано «источником».
Время обработки: Момент времени, когда событие или запись данных обрабатывается приложением потоковой обработки, т. е. когда запись обрабатывается. потребляется.
Время приема: Момент времени, когда событие или запись данных сохраняется в разделе темы с помощью Брокер Kafka.
Это код моего приложения для потоков:
Код: Выделить всё
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream source_o365_user_activity = builder.stream("o365_user_activity");
source_o365_user_activity.flatMapValues(new ValueMapper() {
@Override
public Iterable apply(String value) {
System.out.println("========> o365_user_activity_by_date Log: " + value);
ArrayList keywords = new ArrayList();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString()); // UTC TIME
send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
send.put("user_id", received.get("UserId"));
send.put("operation", received.get("Operation"));
send.put("workload", received.get("Workload"));
keywords.add(send.toString());
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("o365_user_activity_by_date");
Теперь с каждой записью, которую я хочу отправить, время события, время обработки и время приема, встроенное в полезную нагрузку.
Я просмотрел FailOnInvalidTimestamp и WalllockTimestampExtractor, но не понимаю, как их использовать.
Пожалуйста, подскажите, как это сделать. смогу ли я добиться этого.
Подробнее здесь: https://stackoverflow.com/questions/491 ... ka-streams