- жестко запрограммируйте startTime на определенное время (не используя timestamp.now()).
- укажите имя таблицы метаданных (чтобы не использовать автоматически созданный)
Однако, когда я остановил его, дал ему задержку на некоторое время (чтобы имитировать, когда мне нужно обновить свой конвейер), а затем запустил его снова, я получил это ошибка, и конвейер вообще не записал в мой gcs никаких json/текстовых файлов.
Сообщение об ошибке от работника: generic::invalid_argument: SDK сообщил об ошибке меньшее значение водяного знака: 2024-09-25T17:32:17.342+00:00, чем вычисленная нижняя граница: 2024-09-26T01:43:06.072+00:00
Сначала я подумал, что нужно реализовать оконную обработку с допустимой задержкой, поэтому я реализовал ее, добавив оконную обработку перед передачей ее в TextIO.write.
Код: Выделить всё
// apply windowing
var window = Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))
.withAllowedLateness(Duration.standardHours(24))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes();
var windowedChangeStreamRecords = dataChangeRecords
.apply("Windowing", window);
Для контекста: я пытаюсь построить простой процесс приема данных Spanner в GCS (для целей аналитики).
Подробнее здесь: https://stackoverflow.com/questions/790 ... -got-error
Мобильная версия