Извлечение из Kafka sessionStore происходит очень медленно (5-10 секунд). Почему?JAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Извлечение из Kafka sessionStore происходит очень медленно (5-10 секунд). Почему?

Сообщение Anonymous »

В настоящее время я работаю над uni-проектом для потоковой обработки и в настоящее время застрял, пытаясь реализовать хранилище сеансов, поскольку выполнение операции выборки в хранилище, в котором мало ключей, занимает несколько секунд.
Мы передаем некоторые ложные заводские данные, в которых есть некоторые датчики света, которые либо сломаны, либо нет. Поэтому моя идея заключалась в том, чтобы отфильтровать события, в которых свет не был нарушен, а затем создать SessionWindow, чтобы увидеть, как долго каждый датчик освещенности неисправен за раз.
Для достижения поток куда приходят только события с сломанными датчиками света я сделал так:

Код: Выделить всё

        KStream lightBarrierBrokenHBW = hbwTypedStream.filterNot((k, v) ->
v.getData().isI1_light_barrier() && v.getData().isI4_light_barrier());
Затем я сгруппировал поток в зависимости от того, какой световой барьер был преодолен:

Код: Выделить всё

        SessionWindowedKStream sessionizedHbwEvent = lightBarrierBrokenHBW
.groupBy((k, v) -> {
String sensorKey = "unknown";
if (!v.getData().isI4_light_barrier()) {
sensorKey = "i4_light_sensor";
} else if (!v.getData().isI1_light_barrier()) {
sensorKey = "i1_light_sensor";
}
System.out.println("GroupBy: " + k + " -> " + sensorKey);
return sensorKey;
}, Grouped.with(Serdes.String(), hbwEventSerdes))
.windowedBy(sessionWindow);
Затем я выполнил агрегирование и материализовал его в магазине:

Код: Выделить всё

        sessionizedHbwEvent.aggregate(
TimeDifferenceAggregation::new,
TimeDifferenceAggregation::add,
TimeDifferenceAggregation::merge,
Materialized.as("lightSensor")
.withKeySerde(Serdes.String())
.withValueSerde(timeDifferenceAggregationSerde)
.withCachingEnabled()
);
Класс TimeDifferenceAggregation выглядит следующим образом:

Код: Выделить всё

@Setter
@Getter
public class TimeDifferenceAggregation {
@SerializedName("FirstTimestamp")
private Instant firstTimestamp;
@SerializedName("LastTimestamp")
private Instant lastTimestamp;

public TimeDifferenceAggregation() {
this.firstTimestamp = null;
this.lastTimestamp = null;
}

public TimeDifferenceAggregation(Instant firstTimestamp, Instant lastTimestamp) {
this.firstTimestamp = firstTimestamp;
this.lastTimestamp = lastTimestamp;
}

public static TimeDifferenceAggregation add(String Key, HbwEvent event, TimeDifferenceAggregation agg) {
Instant newTime = Instant.parse(event.getTime());

Instant newFirstTimestamp = (agg.firstTimestamp == null || newTime.isBefore(agg.firstTimestamp)) ? newTime : agg.firstTimestamp;
Instant newLastTimestamp = (agg.lastTimestamp == null || newTime.isAfter(agg.lastTimestamp)) ? newTime : agg.lastTimestamp;

return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
}

public static TimeDifferenceAggregation merge(String Key, TimeDifferenceAggregation agg1, TimeDifferenceAggregation agg2) {
Instant newFirstTimestamp;
Instant newLastTimestamp;

if (agg1.firstTimestamp != null && agg2.firstTimestamp != null) {
newFirstTimestamp = agg1.firstTimestamp.isBefore(agg2.firstTimestamp) ? agg1.firstTimestamp : agg2.firstTimestamp;
} else if (agg1.firstTimestamp != null) {
newFirstTimestamp = agg1.firstTimestamp;
} else {
newFirstTimestamp = agg2.firstTimestamp;
}

if (agg1.lastTimestamp != null && agg2.lastTimestamp != null) {
newLastTimestamp = agg1.lastTimestamp.isAfter(agg2.lastTimestamp) ? agg1.lastTimestamp : agg2.lastTimestamp;
} else if (agg1.lastTimestamp != null) {
newLastTimestamp = agg1.lastTimestamp;
} else {
newLastTimestamp = agg2.lastTimestamp;
}

return new TimeDifferenceAggregation(newFirstTimestamp, newLastTimestamp);
}
Технически программа работает как надо, но очень-очень медленно. Так что, если кто-нибудь сталкивался с подобной проблемой или имеет какие-то советы, буду признателен! KafkaStreams запускаются через bean-компонент @Configuration с базовыми конфигурациями. Магазин предоставляется через Bean следующим образом:

Код: Выделить всё

    @Bean
public ReadOnlySessionStore lightSensorStore(KafkaStreams streams) {
return streams.store(StoreQueryParameters.fromNameAndType("lightSensor", QueryableStoreTypes.sessionStore()));
}
В том же приложении я делаю обычное KeyValueStore для какой-то другой агрегации, и это имеет нормальную скорость всего несколько мс. Итак, я действительно не уверен, что я сделал не так с параметром на основе SessionWindow?
Спасибо!
Я попробовал несколько дополнительных настроек StreamsConfig, таких как больше потоков, расширение кеша и т. д. И все это без каких-либо реальных улучшений. Я зарегистрировал все операции и на 100% уверен, что виновником является операция .fetch("i4_light_sensor"). Я также несколько раз запросил конечную точку на случай, если первое чтение было медленным, но результат не улучшился даже после нескольких запросов.
Это конечная точка, которую я тестирую с помощью atm :

Код: Выделить всё

    @GetMapping("/test")
public String getTest() {
Logger logger = LoggerFactory.getLogger(MonitoringRestController.class);

long fetchStartTime = System.currentTimeMillis();
var range = lightSensorStore.fetch("i4_light_sensor");
long fetchEndTime = System.currentTimeMillis();

logger.info("Fetch and process time: {} ms", (fetchEndTime - fetchStartTime));

while(range.hasNext()) {
var next = range.next();
System.out.println(next.key.key());
System.out.println(next.value.getFirstTimestamp());
System.out.println(next.value.getLastTimestamp());
}
range.close();
return "successs";
}
РЕДАКТИРОВАТЬ: Я провел дополнительное тестирование и обнаружил интересное поведение. Я переключился с fetch() на BackFetch(), и операция BacksFetch() сразу же заняла пару мс. Итерация сеансов также прошла так быстро, как и ожидалось, но каким-то образом она застряла в конце? -> Я имею в виду, что как только он распечатал оба сеанса, он просто что-то сделал и закончил:

Код: Выделить всё

    public String getTest() {
Logger logger = LoggerFactory.getLogger(MonitoringRestController.class);

long fetchStartTime = System.currentTimeMillis();
KeyValueIterator range = lightSensorStore.backwardFetch("i4_light_sensor");
long fetchEndTime = System.currentTimeMillis();

logger.info("Fetch and process time: {} ms", (fetchEndTime - fetchStartTime)); // Fetch and process time: 1 ms

long processStartTime = System.currentTimeMillis();

range.forEachRemaining(n ->
logger.info("Key: {}, First Timestamp: {}, Last Timestamp: {}", n.key.key(), n.value.getFirstTimestamp(), n.value.getLastTimestamp())); // both expected sessions got logged instantely

range.close();

long processEndTime = System.currentTimeMillis(); // Processing time: 9438 ms
logger.info("Processing time: {} ms", (processEndTime - processStartTime));

long responseTime = System.currentTimeMillis();
logger.info("Total response time: {} ms", (responseTime - fetchStartTime)); // Total response time: 9440 ms

return "success";
}
Я думаю, что что-то в конце итератора замедляет его работу? Есть ли какая-то особая логика, о которой я не знаю, когда дело доходит до такой итерации по KeyValueIterator?

Подробнее здесь: https://stackoverflow.com/questions/785 ... econds-why
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»