У меня есть KeyedProcessFunction, которая управляет состоянием устройства через класс-оболочку. Доступ к состоянию осуществляется несколько раз с помощью разных методов, и мне интересно, какой подход более эффективен:
Вариант 1: вызывать value() каждый раз, когда это необходимо
Код: Выделить всё
public class DeviceStateManager {
private final ValueState deviceState;
public DeviceStateManager(ValueState deviceState) {
this.deviceState = deviceState;
}
public void updateMetrics(Measurement measurement) throws Exception {
if (deviceState.value() != null) {
DeviceData data = deviceState.value();
data.addMeasurement(measurement);
deviceState.update(data);
}
}
public boolean isAnomalyDetected(double threshold) throws Exception {
return deviceState.value() != null &&
deviceState.value().getAverage() > threshold;
}
public Alert generateAlert() throws Exception {
if (deviceState.value() == null) return null;
return new Alert(
deviceState.value().getDeviceId(),
deviceState.value().getLastValue(),
deviceState.value().getTimestamp()
);
}
}
public class IoTProcessFunction extends KeyedProcessFunction {
private ValueStateDescriptor stateDescriptor;
private transient DeviceStateManager stateManager;
@Override
public void open(Configuration parameters) {
stateDescriptor = new ValueStateDescriptor("device-state", DeviceData.class);
}
@Override
public void processElement(Event event, Context ctx, Collector out) throws Exception {
stateManager = new DeviceStateManager(getRuntimeContext().getState(stateDescriptor));
stateManager.updateMetrics(event.getMeasurement());
if (stateManager.isAnomalyDetected(100.0)) {
Alert alert = stateManager.generateAlert();
out.collect(alert);
}
}
}
Код: Выделить всё
public class DeviceStateManager {
private final ValueState deviceState;
private DeviceData cachedValue;
public DeviceStateManager(ValueState deviceState) throws Exception {
this.deviceState = deviceState;
this.cachedValue = deviceState.value(); // Load once
}
public void updateMetrics(Measurement measurement) throws Exception {
if (cachedValue != null) {
cachedValue.addMeasurement(measurement);
deviceState.update(cachedValue);
}
}
public boolean isAnomalyDetected(double threshold) {
return cachedValue != null && cachedValue.getAverage() > threshold;
}
public Alert generateAlert() {
if (cachedValue == null) return null;
return new Alert(
cachedValue.getDeviceId(),
cachedValue.getLastValue(),
cachedValue.getTimestamp()
);
}
}
// ProcessFunction remains the same
- Выполняет ли ValueState.value() десериализацию при каждом вызове или значение уже кэшируется внутри системы управления состоянием Flink?
- В варианте 1 я вызываю метод deviceState.value() несколько раз (в isAnomalyDetected() он вызывается дважды, в генерироватьAlert() четыре раза). Есть ли снижение производительности?
- В варианте 2 я кэширую значение, но действительно ли это необходимо или я дублирую то, что Flink уже делает внутри себя?
- Какая лучшая практика рекомендуется для сценариев с высокой пропускной способностью?
- Я использую серверную часть состояния RocksDB
/> - Обработка около 40 миллионов событий в день
- Объект DeviceData относительно небольшой (сериализовано ~500 байт)
Подробнее здесь: https://stackoverflow.com/questions/798 ... -in-instan
Мобильная версия