Как лучше всего избежать одновременного обновления таблицы при запуске сценариев Bigquery в лучевом конвейере Apache?Python

Программы на Python
Ответить
Anonymous
 Как лучше всего избежать одновременного обновления таблицы при запуске сценариев Bigquery в лучевом конвейере Apache?

Сообщение Anonymous »

Здравствуйте, я пытаюсь изучить и изучить луч Apache.
Ниже приведен мой сценарий луча Apache, который запускает сценарии, зависящие от последовательности обновлений. Я получаю следующую ошибку от bigquery
Не удалось сериализовать доступ к таблице dwingestion:api_staging.apache_beam_bms_latest_data из-за одновременного обновления
Я пробовал разные способы, включая расширение окна, а также разделение обновлений на разные конвейеры, но ошибка все равно сохраняется. Есть ли способ сделать скрипт более стабильным для запуска обновлений? последовательно без одновременного обновления таблицы?
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
import os

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dwingestion-b033d9535e9d.json"

# GCP project and table configurations
PROJECT_ID = 'dwingestion'
DATASET_ID = 'api_staging'

STAGING_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_data_streaming'
LIVE_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_latest_data'

# SQL query to fetch data from the staging table
STAGING_QUERY = f"""
SELECT *
FROM `{STAGING_TABLE}`
"""

class MergeStagingToLiveTable(beam.DoFn):
"""
Perform the MERGE operation from the staging table to the live table.
"""

def __init__(self, project_id, staging_table, live_table):
self.project_id = project_id
self.staging_table = staging_table
self.live_table = live_table

def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)

def process(self, element):
# Perform the MERGE query
merge_query = """
MERGE `dwingestion.api_staging.apache_beam_bms_latest_data` AS live USING (
SELECT
#...the rest of the query """

query_job = self.bq_client.query(merge_query)
query_job.result()
yield "MERGE query completed."

class CreateOrReplaceLiveTable(beam.DoFn):
"""
Perform the CREATE OR REPLACE operation on the live table.
"""

def __init__(self, project_id, live_table, final_table):
self.project_id = project_id
self.live_table = live_table
self.final_table = final_table

def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)

def process(self, element):
# Perform the CREATE OR REPLACE query
create_replace_query = """
CREATE OR REPLACE TABLE `dwingestion.api.apache_beam_bms_live` AS
WITH with_country AS (
SELECT
* ,
'UG' AS country
FROM
`dwingestion.api_staging.apache_beam_bms_latest_data`
),
# ...the rest of the query
"""
query_job = self.bq_client.query(create_replace_query)
query_job.result()

def run_pipeline():
"""
Main function to set up and execute the pipeline.
"""
pipeline_options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://bodawork_dataflow_template/temp',
staging_location='gs://bodawork_dataflow_template/staging',
region='europe-west1',
job_name='flesp-upsert-streaming-pipeline-dataflow',
save_main_session=True
)
pipeline_options.view_as(StandardOptions).streaming = True

# Apache Beam pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read from Staging Table' >> beam.io.ReadFromBigQuery(query=STAGING_QUERY,
use_standard_sql=True,
method="DIRECT_READ")
# Step 1: Windowing before the first ParDo
| 'Window for Merge' >> beam.WindowInto(beam.window.FixedWindows(60)) # 60-second window
| 'Trigger Merge' >> beam.ParDo(MergeStagingToLiveTable(
project_id=PROJECT_ID,
staging_table=STAGING_TABLE,
live_table=LIVE_TABLE
))

)
with beam.Pipeline() as pipeline_2:
(
pipeline_2

| 'Window for Create/Replace' >> beam.WindowInto(beam.window.FixedWindows(300))
| 'Trigger Create/Replace' >> beam.ParDo(CreateOrReplaceLiveTable(
project_id=PROJECT_ID,
live_table=LIVE_TABLE,
final_table='dwingestion.api.apache_beam_bms_live'
))
)

if __name__ == "__main__":
run_pipeline()


Подробнее здесь: https://stackoverflow.com/questions/792 ... scripts-in
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»