Почему размер контрольной точки Flink всегда растет с RockDBJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 Почему размер контрольной точки Flink всегда растет с RockDB

Сообщение Anonymous »

У меня есть программа обработки Flink, которая получает данные от Kafka.
Затем, после выполнения keyby, она передает данные Дорис и Kafka.
Внутри операции keyby я использую ValueState только для хранения состояний. T также является обычным JavaBean. Однако размер контрольной точки продолжает увеличиваться.
Каждый раз, когда происходит обработка контрольной точки, она блокирует обработку программы.
Хотя я установил state.backend.async: true, кажется, что при обработке контрольных точек прием программой данных Kafka приостанавливается, что приводит к накоплению данных Kafka.
Когда я не использую RocksDB и вместо этого использую HashMap, возникает эта проблема исчезает.
Потом я провел тест. Я использовал простой ValueState, постоянно устанавливал для него значение true, а затем наблюдал за контрольной точкой. Его размер также продолжал увеличиваться.
Не могли бы вы рассказать мне, почему это происходит и как мне с этим справиться?
размер контрольной точки

Код: Выделить всё


public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();
cfg.setString("s3.access-key", "xx");
cfg.setString("s3.secret-key", "xxxx");
cfg.setString("s3.endpoint", "http://xxx:xxx");
cfg.setString("s3.path.style.access", "true");
// use rocksdb the checkpoint size always growing
cfg.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// when use hashmap the checkpoint size fixed like 18.3kb
// cfg.set(StateBackendOptions.STATE_BACKEND, "hashmap");

FileSystem.initialize(cfg, null);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(cfg)
.setParallelism(1);
env.enableCheckpointing(1_000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
//        env.setStateBackend(new HashMapStateBackend());
//        env.getCheckpointConfig().setCheckpointStorage("file:///d:/.roy/.tmp/rocksdb/");
env.getCheckpointConfig().setCheckpointStorage("s3://flink-dev/cpsdev/");
env.configure(cfg);

env.addSource(new RichSourceFunction() {
private static final long serialVersionUID = 1L;
private volatile boolean isrunning;

@Override
public void run(SourceContext ctx) throws Exception {
isrunning = true;
while (isrunning) {
ctx.collect("aaa " + LocalDateTime.now() + " " + System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
}
}

@Override
public void cancel() {
isrunning = false;
}
})
.name("source")
.map(value -> {
String[] ss = value.split(" ");
return new Tuple3(ss[0], ss[1], Long.parseLong(ss[2]));
})
.name("map as tuple2")
.returns(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
//.assignTimestampsAndWatermarks(WatermarkStrategy
//        .forBoundedOutOfOrderness(Duration.ofSeconds(10))
//        .withTimestampAssigner((e, t) -> e.f2))
.keyBy(e -> e.f0)
.process(new KeyedProcessFunction() {
private static final long serialVersionUID = 1L;
private ValueState state;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor vd = new ValueStateDescriptor("value", Types.BOOLEAN);
// the TTL config not work
//                StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.ofSeconds(2))
//                        .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
//                        .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
//                        .cleanupInRocksdbCompactFilter(1000L)
//                        .cleanupIncrementally(10, true)
//                        .build();
//                vd.enableTimeToLive(ttl);
state = getRuntimeContext().getState(vd);
}

@Override
public void processElement(Tuple3 value, Context ctx, Collector out) throws Exception {
for (int i = 0; i < 10000;  i ++) {
// remove this the checkpoint size fixed.
state.update(true);
}
out.collect(value);
}
})
.name("process keyby")
.print()
.name("print>>")
;
env.execute("t4");

}

Я попытался поместить TimerService в кучу и настроил параметры RocksDB.


Подробнее здесь: https://stackoverflow.com/questions/791 ... th-rocksdb
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»