Я пытаюсь вычислить скорость входящих событий в минуту из темы Kafka на основе времени события. Для этого я использую TumblingEventTimeWindows длительностью 1 минуту. Фрагмент кода приведен ниже. Я заметил, что если я не получаю никакого события для определенного окна, например. с 2.34 на 2.35, то к предыдущему окну 2.33 на 2.34 и близко не подобраться. Я понимаю риск потери данных для окна 2.33–2.34 (может произойти из-за сбоя системы, большего лага Kafka и т. д.), но я не могу ждать бесконечно. Мне нужно закрыть это окно после ожидания определенного периода времени, а последующие окна могут продолжить работу после восстановления системы. Как этого добиться?
Я пробую следующий код, который выдает количество событий в минуту для непрерывного потока событий.
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "AllEventCountConsumerGroup");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("event_input_topic", new SimpleStringSchema(), properties);
DataStreamSource kafkaDataStream = environment.addSource(kafkaConsumer);
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getTimestamp()))
.keyBy((KeySelector) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.aggregate(new EventCountAggregator())
.addSink(eventRateProducer);
Подробнее здесь: https://stackoverflow.com/questions/654 ... -for-certa
Apache Flink: как закрыть окно фиксированного размера, если данные не получены в течение определенного периода времени ⇐ JAVA
Программисты JAVA общаются здесь
1730041788
Anonymous
Я пытаюсь вычислить скорость входящих событий в минуту из темы Kafka на основе времени события. Для этого я использую TumblingEventTimeWindows длительностью 1 минуту. Фрагмент кода приведен ниже. Я заметил, что если я не получаю никакого события для определенного окна, например. с 2.34 на 2.35, то к предыдущему окну 2.33 на 2.34 и близко не подобраться. Я понимаю риск потери данных для окна 2.33–2.34 (может произойти из-за сбоя системы, большего лага Kafka и т. д.), но я не могу ждать бесконечно. Мне нужно закрыть это окно после ожидания определенного периода времени, а последующие окна могут продолжить работу после восстановления системы. Как этого добиться?
Я пробую следующий код, который выдает количество событий в минуту для непрерывного потока событий.
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3,
org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)
));
executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
executionEnvironment.setParallelism(1);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "AllEventCountConsumerGroup");
FlinkKafkaConsumer kafkaConsumer = new FlinkKafkaConsumer("event_input_topic", new SimpleStringSchema(), properties);
DataStreamSource kafkaDataStream = environment.addSource(kafkaConsumer);
kafkaDataStream
.flatMap(new EventFlatter())
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner((SerializableTimestampAssigner) (element, recordTimestamp) -> element.getTimestamp()))
.keyBy((KeySelector) Entity::getTenant)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.allowedLateness(Time.seconds(10))
.aggregate(new EventCountAggregator())
.addSink(eventRateProducer);
Подробнее здесь: [url]https://stackoverflow.com/questions/65422206/apache-flink-how-to-close-a-fix-size-window-when-data-is-not-received-for-certa[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия