Водяные знаки не увеличиваются при миганииJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Водяные знаки не увеличиваются при мигании

Сообщение Anonymous »

поэтому я пытаюсь создать свою собственную схему окон, используя unkeyedprocessFunctions. Я использую источник и хочу использовать водяные знаки. Моя текущая реализация водяных знаков следующая:

Код: Выделить всё

this.watermarkStrategy = WatermarkStrategy
.forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
Я создал свой источник следующим образом

Код: Выделить всё

DataStream mainStream = env.readTextFile(csvFilePath)
.map(new MapFunction() {
@Override
public EventBasic map(String line) throws Exception {
String[] parts = line.split(",");
if (parts.length == 3) {
String key = parts[0];
int valueInt = Integer.parseInt(parts[1]);
long valueTimeStamp = Long.parseLong(parts[2]);
return new EventBasic(key, valueInt, valueTimeStamp);
} else {
return null;
}
}
}).setParallelism(3).assignTimestampsAndWatermarks(watermarkStrategy).name("source");
эта исходная функция считывает файл CSV следующего формата:

Код: Выделить всё

key,val,timestamp
A,0,500
C,1,500
A,2,500
A,3,500
A,4,500
B,5,500
A,6,500
H,7,500
...
a,100,1500
Поскольку временные метки монотонно увеличиваются
при немедленном наблюдении (я создал фиктивную функцию процесса, чтобы наблюдать, что мои временные метки работают) я наблюдаю значение -9223372036854775808< /code> который постоянно. Это означает, что генератор водяных знаков не знает, когда добавлять новый водяной знак.
Я также попробовал следующую стратегию использования водяных знаков, которая привела к тому же результату:

Код: Выделить всё

this.watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMillis(500))
.withTimestampAssigner((element, recordTimestamp) -> element.value.timeStamp);
Я не знаю, в чем может быть моя проблема, я пробовал искать везде, но ничего не изменилось.

Подробнее здесь: https://stackoverflow.com/questions/784 ... g-in-flink
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»