Код: Выделить всё
n = 20
DURATION = 20 * 60
event_times = np.random.randint(1, DURATION + 1, n)
events = []
offset = 1_000_000_000
for i in range(n):
events.append(offset + int(event_times[i]))
Код: Выделить всё
with beam.Pipeline(options=pipeline_options) as p:
output = (
p
| "Create Timestamps" >> beam.Create([beam.window.TimestampedValue(f"Value {i}", i) for i in events])
| "AssignWindow" >> beam.WindowInto(beam.window.FixedWindows(60),
timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EARLIEST)
| "Assign15MinuteSlidingWindow" >> beam.WindowInto(
beam.window.SlidingWindows(15 * 60, 60),
timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW,
trigger=beam.trigger.AfterWatermark(),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING,
allowed_lateness=0,
)
| "Add Dummy Key" >> beam.Map(lambda x: ('key', x))
| "Count Elements" >> beam.CombinePerKey(beam.combiners.CountCombineFn())
| "Print Elements with Timestamps" >> beam.ParDo(PrintWithTimestampsAndWindow())
| 'Write' >> WriteToText(known_args.output)
)
DirectRunner правильно агрегирует уникальные окна (из PrintWithTimestampsAndWindow())
Код: Выделить всё
('key', 13) @ 2001-09-09T01:55:59.999000Z in window [2001-09-09T01:41:00Z, 2001-09-09T01:56:00Z)
Подробнее здесь: https://stackoverflow.com/questions/793 ... flowrunner
Мобильная версия