Несоответствие окна Apache Beam между DirectRunner и DataflowRunnerPython

Программы на Python
Ответить
Anonymous
 Несоответствие окна Apache Beam между DirectRunner и DataflowRunner

Сообщение Anonymous »

У меня есть тестовый набор данных для создания элементов с отметками времени в течение следующих 20 минут. Я хочу создать фиксированные окна продолжительностью 1 минуту, а затем каждую минуту создавать скользящее окно продолжительностью 15 минут.

Код: Выделить всё

n = 20
DURATION = 20 * 60
event_times = np.random.randint(1, DURATION + 1, n)
events = []
offset = 1_000_000_000
for i in range(n):
events.append(offset + int(event_times[i]))
Я вижу, что OUTPUT_AT_EOW в 15-минутном скользящем окне ведет себя по-разному в DataflowRunner и DirectRunner.

Код: Выделить всё

with beam.Pipeline(options=pipeline_options) as p:
output = (
p
| "Create Timestamps" >> beam.Create([beam.window.TimestampedValue(f"Value {i}", i) for i in events])
| "AssignWindow" >> beam.WindowInto(beam.window.FixedWindows(60),
timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EARLIEST)
| "Assign15MinuteSlidingWindow" >> beam.WindowInto(
beam.window.SlidingWindows(15 * 60, 60),
timestamp_combiner=beam.window.TimestampCombiner.OUTPUT_AT_EOW,
trigger=beam.trigger.AfterWatermark(),
accumulation_mode=beam.trigger.AccumulationMode.DISCARDING,
allowed_lateness=0,
)
| "Add Dummy Key" >> beam.Map(lambda x: ('key', x))
| "Count Elements" >> beam.CombinePerKey(beam.combiners.CountCombineFn())
| "Print Elements with Timestamps" >> beam.ParDo(PrintWithTimestampsAndWindow())
| 'Write' >> WriteToText(known_args.output)
)
Расхождение, наблюдаемое между двумя бегунами (с использованием EOW), выглядит, как показано ниже (для краткости результат сокращен).
DirectRunner правильно агрегирует уникальные окна (из PrintWithTimestampsAndWindow())

Код: Выделить всё

('key', 13) @ 2001-09-09T01:55:59.999000Z in window [2001-09-09T01:41:00Z, 2001-09-09T01:56:00Z)
DataflowRunner имеет только окна с небольшим количеством элементов (

Подробнее здесь: https://stackoverflow.com/questions/793 ... flowrunner
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»