Код: Выделить всё
gs://my-bucket/temp/bq_load/...
Вот структура конвейера:
Код: Выделить всё
worker_options.sdk_container_image = '...'
with beam.Pipeline(options=pipeline_options) as p:
processed_data = (
p
| "ReadFiles" >> beam.Create(FILE_LIST)
| "ProcessFiles" >> beam.ParDo(ProcessAvroFileDoFn())
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
table=f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}",
schema=BQ_SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
)
- Конвейер успешно работает с DirectRunner, записывая данные в BigQuery без каких-либо проблем.
При использовании DataflowRunner конвейер завершается без ошибок и предупреждений, но:
- Строки не записываются в BigQuery.
- Большие временные файлы остаются в корзине (например, bq_load/...). - Обрабатываемые данные действительны в формате NDJSON.
- Схема BigQuery соответствует структуре данных.
- Проверяя оставшиеся временные файлы, я скачал temp и проверил, что он содержит допустимые строки NDJSON. Загрузка этого файла в BigQuery вручную с помощью команды bq load работает нормально.
- Тестирование с другими наборами данных:
Я пробовал много разных входных данных. , но проблема сохраняется. - Проверка журналов потока данных:
Я просмотрел журналы в консоли мониторинга потока данных, но не обнаружил ошибок. или предупреждения.
Подробнее здесь: https://stackoverflow.com/questions/793 ... o-bigquery