Apache Flink 1.18. Лучшая практика доступа к ValueState: несколько вызовов value() и кэширование в переменной экземпляраJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Apache Flink 1.18. Лучшая практика доступа к ValueState: несколько вызовов value() и кэширование в переменной экземпляра

Сообщение Anonymous »

Я работаю с Apache Flink 1.18, и у меня есть вопрос о наилучшей практике доступа к ValueState с точки зрения производительности.
У меня есть KeyedProcessFunction, которая управляет состоянием устройства через класс-оболочку. Доступ к состоянию осуществляется несколько раз с помощью разных методов, и мне интересно, какой подход более эффективен:
Вариант 1: вызывать value() каждый раз, когда это необходимо

Код: Выделить всё

public class DeviceStateManager {
private final ValueState deviceState;

public DeviceStateManager(ValueState deviceState) {
this.deviceState = deviceState;
}

public void updateMetrics(Measurement measurement) throws Exception {
if (deviceState.value() != null) {
DeviceData data = deviceState.value();
data.addMeasurement(measurement);
deviceState.update(data);
}
}

public boolean isAnomalyDetected(double threshold) throws Exception {
return deviceState.value() != null &&
deviceState.value().getAverage() > threshold;
}

public Alert generateAlert() throws Exception {
if (deviceState.value() == null) return null;
return new Alert(
deviceState.value().getDeviceId(),
deviceState.value().getLastValue(),
deviceState.value().getTimestamp()
);
}
}

public class IoTProcessFunction extends KeyedProcessFunction {
private ValueStateDescriptor stateDescriptor;
private transient DeviceStateManager stateManager;

@Override
public void open(Configuration parameters) {
stateDescriptor = new ValueStateDescriptor("device-state", DeviceData.class);
}

@Override
public void processElement(Event event, Context ctx, Collector out) throws Exception {
stateManager = new DeviceStateManager(getRuntimeContext().getState(stateDescriptor));

stateManager.updateMetrics(event.getMeasurement());

if (stateManager.isAnomalyDetected(100.0)) {
Alert alert = stateManager.generateAlert();
out.collect(alert);
}
}
}
Вариант 2. Кэшируйте значение в переменной экземпляра

Код: Выделить всё

public class DeviceStateManager {
private final ValueState deviceState;
private DeviceData cachedValue;

public DeviceStateManager(ValueState deviceState) throws Exception {
this.deviceState = deviceState;
this.cachedValue = deviceState.value();  // Load once
}

public void updateMetrics(Measurement measurement) throws Exception {
if (cachedValue != null) {
cachedValue.addMeasurement(measurement);
deviceState.update(cachedValue);
}
}

public boolean isAnomalyDetected(double threshold) {
return cachedValue != null && cachedValue.getAverage() > threshold;
}

public Alert generateAlert() {
if (cachedValue == null) return null;
return new Alert(
cachedValue.getDeviceId(),
cachedValue.getLastValue(),
cachedValue.getTimestamp()
);
}
}

// ProcessFunction remains the same
Мои опасения:
  • Выполняет ли ValueState.value() десериализацию при каждом вызове или значение уже кэшируется внутри системы управления состоянием Flink?
  • В варианте 1 я вызываю метод deviceState.value() несколько раз (в isAnomalyDetected() он вызывается дважды, в генерироватьAlert() четыре раза). Есть ли снижение производительности?
  • В варианте 2 я кэширую значение, но действительно ли это необходимо или я дублирую то, что Flink уже делает внутри себя?
  • Какая лучшая практика рекомендуется для сценариев с высокой пропускной способностью?
Дополнительный контекст:
  • Я использую серверную часть состояния RocksDB
    />
  • Обработка около 40 миллионов событий в день
  • Объект DeviceData относительно небольшой (сериализовано ~500 байт)
Какой подход мне следует предпочесть для оптимальной производительности?


Подробнее здесь: https://stackoverflow.com/questions/798 ... alue-calls
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»