Таблицы создаются с правильным именем, но данные не записываются, ссылаясь на ошибку форматирования ниже.
"Неизвестное имя "json" в 'rows[0]': поле Proto не повторяется, невозможно начать список"
Этап печати записи предоставляет этот журнал непосредственно перед вызовом writetobigquery — что мне кажется правильным:
О записи в BigQuery — таблица:
PROJECT_ID:DATASET_NAME.TABLE_NAME, запись: [{'event_name': 'scroll', 'event_date': '20241118', 'user_id': '', 'platform': 'WEB'}]
(для ясности я попробовал удалить квадратные скобки, результат тот же)
Вот код конвейера
Код: Выделить всё
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.transforms.window import FixedWindows
import logging
def log_before_write(element):
table_name, record = element
logging.info(f"About to write to BigQuery - Table: {table_name}, Record: {record}")
return element
class SplitByParameter(beam.DoFn):
def process(self, element):
event_name = element['event_name']
event_date = element['event_date']
yield (event_name, event_date, element)
def format_table_name(element):
event_name, event_date, record = element
sanitized_event_name = event_name.replace(' ', '_')
sanitized_event_date = event_date.replace(' ', '_')
table_name = f'PROJECT_ID:DATASET.{sanitized_event_name}_{sanitized_event_date}'
return table_name, record
def split_records(element):
table_name, record = element
json_record = [{
'event_name': str(record.get('event_name', '')) if record.get('event_name') is not None else '',
'event_date': str(record.get('event_date', '')) if record.get('event_date') is not None else '',
'user_id': str(record.get('user_id', '')) if record.get('user_id') is not None else '',
'platform': str(record.get('platform', '')) if record.get('platform') is not None else ''
}]
yield (table_name,json_record)
def print_record(record):
logging.info(f"Record before WriteToBigQuery: {record}")
return record
def run(argv=None):
options = PipelineOptions(argv)
options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=options)
# Define schema for BigQuery (this needs to match your record structure)
schema = 'event_name:STRING, event_date:STRING, user_id:STRING, platform:STRING'
# Read from BigQuery, apply windowing, and process records
(p
| 'ReadFromBigQuery' >> beam.io.ReadFromBigQuery(query=f'''
SELECT *
FROM `PROJECT_ID.DATASET.TABLE`
WHERE _TABLE_SUFFIX = FORMAT_TIMESTAMP('%Y%m%d', CURRENT_TIMESTAMP())
''', use_standard_sql=True)
| 'ApplyWindowing' >> beam.WindowInto(FixedWindows(60)) # 60-second window
| 'SplitByParameter' >> beam.ParDo(SplitByParameter()) # Split by event_name and event_date
| 'FormatTableName' >> beam.Map(format_table_name) # Format the table name
| 'LogBeforeFlatMap' >> beam.Map(lambda x: logging.info(f'Before FlatMap: {x}') or x)
| 'SplitRecords' >> beam.FlatMap(split_records) # Convert record to desired format
| 'LogBeforeWrite' >> beam.Map(log_before_write)
| 'PrintRecord' >> beam.Map(print_record) # Print records before writing to BigQuery
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
table=lambda x: x[0], # Table name is the first element of the tuple
schema=schema, # Use the schema defined above
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND # Append data to existing tables
)
)
p.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Я пробовал кучу разных форматирований, но существует очень много конфигураций, которые имеют шанс на работу.
Подробнее здесь: https://stackoverflow.com/questions/791 ... load-error