При перезапуске конвейера ChangeStream SpannerIO в GCS (TEXT/JSON) произошла ошибкаJAVA

Программисты JAVA общаются здесь
Ответить
Anonymous
 При перезапуске конвейера ChangeStream SpannerIO в GCS (TEXT/JSON) произошла ошибка

Сообщение Anonymous »

Я настроил этот конвейер: https://cloud.google.com/dataflow/docs/ ... ud-storage, автор:
  • жестко запрограммируйте startTime на определенное время (не используя timestamp.now()).
  • укажите имя таблицы метаданных (чтобы не использовать автоматически созданный)
Первый запуск прошел хорошо. Он создал DataChangeRecord в формате json для моей корзины gcs.
Однако, когда я остановил его, дал ему задержку на некоторое время (чтобы имитировать, когда мне нужно обновить свой конвейер), а затем запустил его снова, я получил это ошибка, и конвейер вообще не записал в мой gcs никаких json/текстовых файлов.

Сообщение об ошибке от работника: generic::invalid_argument: SDK сообщил об ошибке меньшее значение водяного знака: 2024-09-25T17:32:17.342+00:00, чем вычисленная нижняя граница: 2024-09-26T01:43:06.072+00:00

Сначала я подумал, что нужно реализовать оконную обработку с допустимой задержкой, поэтому я реализовал ее, добавив оконную обработку перед передачей ее в TextIO.write.

Код: Выделить всё

 // apply windowing
var window = Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))
.withAllowedLateness(Duration.standardHours(24))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.discardingFiredPanes();
var windowedChangeStreamRecords = dataChangeRecords
.apply("Windowing", window);
Однако проблема все еще сохраняется. Есть идеи, что с этим не так?
Для контекста: я пытаюсь построить простой процесс приема данных Spanner в GCS (для целей аналитики).

Подробнее здесь: https://stackoverflow.com/questions/790 ... -got-error
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «JAVA»