Apache Flink: как закрыть окно фиксированного размера, если данные не получены в течение определенного периода времениJAVA

Программисты JAVA общаются здесь
Ответить Пред. темаСлед. тема
Anonymous
 Apache Flink: как закрыть окно фиксированного размера, если данные не получены в течение определенного периода времени

Сообщение Anonymous »

Я пытаюсь вычислить скорость входящих событий в минуту из темы Kafka на основе времени события. Для этого я использую TumblingEventTimeWindows длительностью 1 минуту. Фрагмент кода приведен ниже. Я заметил, что если я не получаю никакого события для определенного окна, например. с 2.34 на 2.35, то к предыдущему окну 2.33 на 2.34 и близко не подобраться. Я понимаю риск потери данных для окна 2.33–2.34 (может произойти из-за сбоя системы, большего лага Kafka и т. д.), но я не могу ждать бесконечно. Мне нужно закрыть это окно после ожидания определенного периода времени, а последующие окна могут продолжить работу после восстановления системы. Как этого добиться?
Я пробую следующий код, который выдает количество событий в минуту для непрерывного потока событий.
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "AllEventCountConsumerGroup");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("event_input_topic", new SimpleStringSchema(), properties);
DataStreamSource kafkaDataStream = environment.addSource(kafkaConsumer);
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getTimestamp()))
.keyBy((KeySelector) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.aggregate(new EventCountAggregator())
.addSink(eventRateProducer);


Подробнее здесь: https://stackoverflow.com/questions/654 ... -for-certa
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «JAVA»