Вот моя работа с pyflink. Сколько бы я ни старался, мне не удалось получить водяные знаки заранее.
В результате я вижу следующее:
Event: (1, 1000), Current Watermark: -9223372036854775808
Event: (2, 2000), Current Watermark: -9223372036854775808
Event: (3, 3000), Current Watermark: -9223372036854775808
Event: (4, 4000), Current Watermark: -9223372036854775808
Event: (5, 5000), Current Watermark: -9223372036854775808
(1, 1000)
(2, 2000)
(3, 3000)
(4, 4000)
(5, 5000)
Как я понимаю, это означает, что водяные знаки застряли.
Когда я сделал то же самое в Java, я увидел продвижение водяных знаков, хотя я добавил Thread. Sleep(1000) между элементами. Я пробовал сделать то же самое в Python, но это тоже не сработало.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream.functions import ProcessFunction
from pyflink.common import Duration
# Custom TimestampAssigner
class CustomTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, element, record_timestamp):
return element[1]
# Custom ProcessFunction
class PrintWatermarkProcessFunction(ProcessFunction):
def process_element(self, value, ctx: ProcessFunction.Context):
current_watermark = ctx.timer_service().current_watermark()
print(f"Event: {value}, Current Watermark: {current_watermark}")
yield value # Forward event
# Execution Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# Create a data source with more events to better illustrate watermarks
events = [
(1, 1000),
(2, 2000),
(3, 3000),
(4, 4000),
(5, 5000)
]
source = env.from_collection(events)
# WatermarkStrategy with 1 second out-of-orderness
watermark_strategy = (
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(1))
.with_timestamp_assigner(CustomTimestampAssigner())
)
# Assign watermarks
watermarked_stream = source.assign_timestamps_and_watermarks(watermark_strategy)
# Print watermark progression
processed_stream = watermarked_stream.process(PrintWatermarkProcessFunction())
# Print the events
processed_stream.print()
# Execute the job
env.execute("Flink Job with Proper Watermark Emission")
Подробнее здесь: https://stackoverflow.com/questions/792 ... -are-stuck
Водяные знаки Pyflink застряли ⇐ Python
Программы на Python
1733006802
Anonymous
Вот моя работа с pyflink. Сколько бы я ни старался, мне не удалось получить водяные знаки заранее.
В результате я вижу следующее:
Event: (1, 1000), Current Watermark: -9223372036854775808
Event: (2, 2000), Current Watermark: -9223372036854775808
Event: (3, 3000), Current Watermark: -9223372036854775808
Event: (4, 4000), Current Watermark: -9223372036854775808
Event: (5, 5000), Current Watermark: -9223372036854775808
(1, 1000)
(2, 2000)
(3, 3000)
(4, 4000)
(5, 5000)
Как я понимаю, это означает, что водяные знаки застряли.
Когда я сделал то же самое в Java, я увидел продвижение водяных знаков, хотя я добавил Thread. Sleep(1000) между элементами. Я пробовал сделать то же самое в Python, но это тоже не сработало.
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream.functions import ProcessFunction
from pyflink.common import Duration
# Custom TimestampAssigner
class CustomTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, element, record_timestamp):
return element[1]
# Custom ProcessFunction
class PrintWatermarkProcessFunction(ProcessFunction):
def process_element(self, value, ctx: ProcessFunction.Context):
current_watermark = ctx.timer_service().current_watermark()
print(f"Event: {value}, Current Watermark: {current_watermark}")
yield value # Forward event
# Execution Environment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
# Create a data source with more events to better illustrate watermarks
events = [
(1, 1000),
(2, 2000),
(3, 3000),
(4, 4000),
(5, 5000)
]
source = env.from_collection(events)
# WatermarkStrategy with 1 second out-of-orderness
watermark_strategy = (
WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(1))
.with_timestamp_assigner(CustomTimestampAssigner())
)
# Assign watermarks
watermarked_stream = source.assign_timestamps_and_watermarks(watermark_strategy)
# Print watermark progression
processed_stream = watermarked_stream.process(PrintWatermarkProcessFunction())
# Print the events
processed_stream.print()
# Execute the job
env.execute("Flink Job with Proper Watermark Emission")
Подробнее здесь: [url]https://stackoverflow.com/questions/79238038/pyflink-watermarks-are-stuck[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия