Мы передаем некоторые ложные заводские данные, в которых есть некоторые датчики света, которые либо сломаны, либо нет. Поэтому моя идея заключалась в том, чтобы отфильтровать события, в которых свет не был нарушен, а затем создать SessionWindow, чтобы увидеть, как долго каждый датчик освещенности неисправен за раз.
Для достижения поток куда приходят только события с сломанными датчиками света я сделал так:
Код: Выделить всё
KStream lightBarrierBrokenHBW = hbwTypedStream.filterNot((k, v) ->
v.getData().isI1_light_barrier() && v.getData().isI4_light_barrier());
Код: Выделить всё
SessionWindowedKStream sessionizedHbwEvent = lightBarrierBrokenHBW
.groupBy((k, v) -> {
String sensorKey = "unknown";
if (!v.getData().isI4_light_barrier()) {
sensorKey = "i4_light_sensor";
} else if (!v.getData().isI1_light_barrier()) {
sensorKey = "i1_light_sensor";
}
System.out.println("GroupBy: " + k + " -> " + sensorKey);
return sensorKey;
}, Grouped.with(Serdes.String(), hbwEventSerdes))
.windowedBy(sessionWindow);
Код: Выделить всё
sessionizedHbwEvent.aggregate(
TimeDifferenceAggregation::new,
TimeDifferenceAggregation::add,
TimeDifferenceAggregation::merge,
Materialized.as("lightSensor")
.withKeySerde(Serdes.String())
.withValueSerde(timeDifferenceAggregationSerde)
.withCachingEnabled()
);
Код: Выделить всё
@Setter
@Getter
public class TimeDifferenceAggregation {
@SerializedName("FirstTimestamp")
private Instant firstTimestamp;
@SerializedName("LastTimestamp")
private Instant lastTimestamp;
public TimeDifferenceAggregation() {
this.firstTimestamp = null;
this.lastTimestamp = null;
}
public TimeDifferenceAggregation(Instant firstTimestamp, Instant lastTimestamp) {
this.firstTimestamp = firstTimestamp;
this.lastTimestamp = lastTimestamp;
}
public static TimeDifferenceAggregation add(String Key, HbwEvent event, TimeDifferenceAggregation agg) {
Instant newTime = Instant.parse(event.getTime());
Instant newFirstTimestamp = (agg.firstTimestamp == null || newTime.isBefore(agg.firstTimestamp)) ? newTime : agg.firstTimestamp;
Instant newLastTimestamp = (agg.lastTimestamp == null || newTime.isAfter(agg.lastTimestamp)) ? newTime : agg.lastTimestamp;
return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
}
public static TimeDifferenceAggregation merge(String Key, TimeDifferenceAggregation agg1, TimeDifferenceAggregation agg2) {
Instant newFirstTimestamp;
Instant newLastTimestamp;
if (agg1.firstTimestamp != null && agg2.firstTimestamp != null) {
newFirstTimestamp = agg1.firstTimestamp.isBefore(agg2.firstTimestamp) ? agg1.firstTimestamp : agg2.firstTimestamp;
} else if (agg1.firstTimestamp != null) {
newFirstTimestamp = agg1.firstTimestamp;
} else {
newFirstTimestamp = agg2.firstTimestamp;
}
if (agg1.lastTimestamp != null && agg2.lastTimestamp != null) {
newLastTimestamp = agg1.lastTimestamp.isAfter(agg2.lastTimestamp) ? agg1.lastTimestamp : agg2.lastTimestamp;
} else if (agg1.lastTimestamp != null) {
newLastTimestamp = agg1.lastTimestamp;
} else {
newLastTimestamp = agg2.lastTimestamp;
}
return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
}
Код: Выделить всё
@Bean
public ReadOnlySessionStore lightSensorStore(KafkaStreams streams) {
return streams.store(StoreQueryParameters.fromNameAndType("lightSensor", QueryableStoreTypes.sessionStore()));
}
Спасибо!
Я попробовал несколько дополнительных настроек StreamsConfig, таких как больше потоков, расширение кеша и т. д. И все это без каких-либо реальных улучшений. Я зарегистрировал все операции и на 100% уверен, что виновником является операция .fetch("i4_light_sensor"). Я также несколько раз запросил конечную точку на случай, если первое чтение было медленным, но результат не улучшился даже после нескольких запросов.
Это конечная точка, которую я тестирую с помощью atm :
Код: Выделить всё
@GetMapping("/test")
public String getTest() {
Logger logger = LoggerFactory.getLogger(MonitoringRestController.class);
long fetchStartTime = System.currentTimeMillis();
var range = lightSensorStore.fetch("i4_light_sensor");
long fetchEndTime = System.currentTimeMillis();
logger.info("Fetch and process time: {} ms", (fetchEndTime - fetchStartTime));
while(range.hasNext()) {
var next = range.next();
System.out.println(next.key.key());
System.out.println(next.value.getFirstTimestamp());
System.out.println(next.value.getLastTimestamp());
}
range.close();
return "successs";
}
Код: Выделить всё
public String getTest() {
Logger logger = LoggerFactory.getLogger(MonitoringRestController.class);
long fetchStartTime = System.currentTimeMillis();
KeyValueIterator range = lightSensorStore.backwardFetch("i4_light_sensor");
long fetchEndTime = System.currentTimeMillis();
logger.info("Fetch and process time: {} ms", (fetchEndTime - fetchStartTime)); // Fetch and process time: 1 ms
long processStartTime = System.currentTimeMillis();
range.forEachRemaining(n ->
logger.info("Key: {}, First Timestamp: {}, Last Timestamp: {}", n.key.key(), n.value.getFirstTimestamp(), n.value.getLastTimestamp())); // both expected sessions got logged instantely
range.close();
long processEndTime = System.currentTimeMillis(); // Processing time: 9438 ms
logger.info("Processing time: {} ms", (processEndTime - processStartTime));
long responseTime = System.currentTimeMillis();
logger.info("Total response time: {} ms", (responseTime - fetchStartTime)); // Total response time: 9440 ms
return "success";
}
Подробнее здесь: https://stackoverflow.com/questions/785 ... econds-why
Мобильная версия