Код: Выделить всё
this.watermarkStrategy = WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
Код: Выделить всё
DataStream mainStream = env.readTextFile(csvFilePath)
.map(new MapFunction() {
@Override
public EventBasic map(String line) throws Exception {
String[] parts = line.split(",");
if (parts.length == 3) {
String key = parts[0];
int valueInt = Integer.parseInt(parts[1]);
long valueTimeStamp = Long.parseLong(parts[2]);
return new EventBasic(key, valueInt, valueTimeStamp);
} else {
return null;
}
}
}).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");
Код: Выделить всё
key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500
при немедленном наблюдении (я создал фиктивную функцию процесса, чтобы наблюдать, что мои временные метки работают) я наблюдаю значение -9223372036854775808< /code> который постоянно. Это означает, что генератор водяных знаков не знает, когда добавлять новый водяной знак.
Я также попробовал следующую стратегию использования водяных знаков, которая привела к тому же результату:
Код: Выделить всё
this.watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
Подробнее здесь: https://stackoverflow.com/questions/784 ... g-in-flink