Затем, после выполнения keyby, она передает данные Дорис и Kafka.
Внутри операции keyby я использую ValueState только для хранения состояний. T также является обычным JavaBean. Однако размер контрольной точки продолжает увеличиваться.
Каждый раз, когда происходит обработка контрольной точки, она блокирует обработку программы.
Хотя я установил state.backend.async: true, кажется, что при обработке контрольных точек прием программой данных Kafka приостанавливается, что приводит к накоплению данных Kafka.
Когда я не использую RocksDB и вместо этого использую HashMap, возникает эта проблема исчезает.
Потом я провел тест. Я использовал простой ValueState, постоянно устанавливал для него значение true, а затем наблюдал за контрольной точкой. Его размер также продолжал увеличиваться.
Не могли бы вы рассказать мне, почему это происходит и как мне с этим справиться?
размер контрольной точки
Код: Выделить всё
public static void main(String[] args) throws Exception {
Configuration cfg = new Configuration();
cfg.setString("s3.access-key", "xx");
cfg.setString("s3.secret-key", "xxxx");
cfg.setString("s3.endpoint", "http://xxx:xxx");
cfg.setString("s3.path.style.access", "true");
// use rocksdb the checkpoint size always growing
cfg.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
// when use hashmap the checkpoint size fixed like 18.3kb
// cfg.set(StateBackendOptions.STATE_BACKEND, "hashmap");
FileSystem.initialize(cfg, null);
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironmentWithWebUI(cfg)
.setParallelism(1);
env.enableCheckpointing(1_000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
// env.setStateBackend(new HashMapStateBackend());
// env.getCheckpointConfig().setCheckpointStorage("file:///d:/.roy/.tmp/rocksdb/");
env.getCheckpointConfig().setCheckpointStorage("s3://flink-dev/cpsdev/");
env.configure(cfg);
env.addSource(new RichSourceFunction() {
private static final long serialVersionUID = 1L;
private volatile boolean isrunning;
@Override
public void run(SourceContext ctx) throws Exception {
isrunning = true;
while (isrunning) {
ctx.collect("aaa " + LocalDateTime.now() + " " + System.currentTimeMillis());
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isrunning = false;
}
})
.name("source")
.map(value -> {
String[] ss = value.split(" ");
return new Tuple3(ss[0], ss[1], Long.parseLong(ss[2]));
})
.name("map as tuple2")
.returns(Types.TUPLE(Types.STRING, Types.STRING, Types.LONG))
//.assignTimestampsAndWatermarks(WatermarkStrategy
// .forBoundedOutOfOrderness(Duration.ofSeconds(10))
// .withTimestampAssigner((e, t) -> e.f2))
.keyBy(e -> e.f0)
.process(new KeyedProcessFunction() {
private static final long serialVersionUID = 1L;
private ValueState state;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor vd = new ValueStateDescriptor("value", Types.BOOLEAN);
// the TTL config not work
// StateTtlConfig ttl = StateTtlConfig.newBuilder(Time.ofSeconds(2))
// .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
// .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
// .cleanupInRocksdbCompactFilter(1000L)
// .cleanupIncrementally(10, true)
// .build();
// vd.enableTimeToLive(ttl);
state = getRuntimeContext().getState(vd);
}
@Override
public void processElement(Tuple3 value, Context ctx, Collector out) throws Exception {
for (int i = 0; i < 10000; i ++) {
// remove this the checkpoint size fixed.
state.update(true);
}
out.collect(value);
}
})
.name("process keyby")
.print()
.name("print>>")
;
env.execute("t4");
}
Подробнее здесь: https://stackoverflow.com/questions/791 ... th-rocksdb
Мобильная версия