Apache Beam записывает ошибку полезной нагрузки bigquery jsonPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Apache Beam записывает ошибку полезной нагрузки bigquery json

Сообщение Anonymous »

Написание конвейера, который разбивает поток на таблицы, динамически именуемые по имени event_name и event_date в данных, в Dataflow.
Таблицы создаются с правильным именем, но данные не записываются, ссылаясь на ошибку форматирования ниже.
"Неизвестное имя "json" в 'rows[0]': поле Proto не повторяется, невозможно начать список"
Этап печати записи предоставляет этот журнал непосредственно перед вызовом writetobigquery — что мне кажется правильным:
О записи в BigQuery — таблица:
PROJECT_ID:DATASET_NAME.TABLE_NAME, запись: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]
(для ясности я попробовал удалить квадратные скобки, результат тот же)
Вот код конвейера

Код: Выделить всё

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging

def log_before_write(element):
table_name, record = element
logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
return element

class SplitByParameter(beam.DoFn):
def process(self, element):
event_name = element['event_name']
event_date = element['event_date']
yield (event_name, event_date, element)

def format_table_name(element):
event_name, event_date, record = element
sanitized_event_name = event_name.replace(' ', '_')
sanitized_event_date = event_date.replace(' ', '_')
table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
return table_name, record

def split_records(element):
table_name, record = element

json_record = [{
'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
}]

yield (table_name,json_record)

def print_record(record):
logging.info(f"Record before WriteToBigQuery: {record}")
return record

def run(argv=None):
options = PipelineOptions(argv)
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)

# Define schema for BigQuery (this needs to match your record structure)
schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'

# Read from BigQuery, apply windowing, and process records
(p
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
SELECT *
FROM `PROJECT_ID.DATASET.TABLE`
WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
''', use_standard_sql=True)
| 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60))  # 60-second window
| 'SplitByParameter' >> beam.ParDo(SplitByParameter())  # Split by event_name and event_date
| 'FormatTableName' >> beam.Map(format_table_name)  # Format the table name
| 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
| 'SplitRecords' >> beam.FlatMap(split_records)  # Convert record to desired format
| 'LogBeforeWrite' >> beam.Map(log_before_write)
| 'PrintRecord' >> beam.Map(print_record)  # Print records before writing to BigQuery
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table=lambda x: x[0],  # Table name is the first element of the tuple
schema=schema,  # Use the schema defined above
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND  # Append data to existing tables
)
)

p.run()

if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()

Не знаю, куда двигаться дальше...
Я пробовал кучу разных форматирований, но существует очень много конфигураций, которые имеют шанс на работу.

Подробнее здесь: https://stackoverflow.com/questions/791 ... load-error
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Apache Beam записывает ошибку полезной нагрузки bigquery json
    Anonymous » » в форуме Python
    0 Ответы
    11 Просмотры
    Последнее сообщение Anonymous
  • Apache_beam записывает ошибку полезной нагрузки json большого запроса
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Do BigQuery.loadJobConfig () и File Loads Method of Apache Beam записать в метод BigQuery - это то же самое
    Anonymous » » в форуме Python
    0 Ответы
    23 Просмотры
    Последнее сообщение Anonymous
  • Почему мой конвейер потока данных Apache Beam не записывает данные в BigQuery?
    Anonymous » » в форуме Python
    0 Ответы
    20 Просмотры
    Последнее сообщение Anonymous
  • Почему мой конвейер потока данных Apache Beam не записывает данные в BigQuery?
    Anonymous » » в форуме Python
    0 Ответы
    18 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»