Как извлечь временную метку, встроенную в сообщения в Kafka StreamsJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Как извлечь временную метку, встроенную в сообщения в Kafka Streams

Сообщение Anonymous »

Я хочу извлечь временные метки, встроенные в каждое сообщение, и отправить их в виде полезных данных JSON в свою базу данных.

Я хочу получить следующие три временные метки.

Время события: Момент времени, когда произошло событие или запись данных, т. е. было первоначально создано «источником».

Время обработки: Момент времени, когда событие или запись данных обрабатывается приложением потоковой обработки, т. е. когда запись обрабатывается. потребляется.

Время приема: Момент времени, когда событие или запись данных сохраняется в разделе темы с помощью Брокер Kafka.

Это код моего приложения для потоков:

Код: Выделить всё

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream source_o365_user_activity = builder.stream("o365_user_activity");

source_o365_user_activity.flatMapValues(new ValueMapper() {
@Override
public Iterable apply(String value) {
System.out.println("========> o365_user_activity_by_date Log:     " + value);
ArrayList keywords = new ArrayList();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);

send.put("current_date", getCurrentDate().toString()); // UTC TIME
send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
send.put("user_id", received.get("UserId"));
send.put("operation", received.get("Operation"));
send.put("workload", received.get("Workload"));
keywords.add(send.toString());

} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}

return keywords;
}
}).to("o365_user_activity_by_date");
В коде я просто получаю каждую запись, выполняю над ней некоторую потоковую обработку и отправляю ее в другую тему.

Теперь с каждой записью, которую я хочу отправить, время события, время обработки и время приема, встроенное в полезную нагрузку.

Я просмотрел FailOnInvalidTimestamp и WalllockTimestampExtractor, но не понимаю, как их использовать.

Пожалуйста, подскажите, как это сделать. смогу ли я добиться этого.

Подробнее здесь: https://stackoverflow.com/questions/491 ... ka-streams
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»