Здравствуйте, я пытаюсь изучить и изучить луч Apache.
Ниже приведен мой сценарий луча Apache, который запускает сценарии, зависящие от последовательности обновлений. Я получаю следующую ошибку от bigquery
Не удалось сериализовать доступ к таблице dwingestion:api_staging.apache_beam_bms_latest_data из-за одновременного обновления
Я пробовал разные способы, включая расширение окна, а также разделение обновлений на разные конвейеры, но ошибка все равно сохраняется. Есть ли способ сделать скрипт более стабильным для запуска обновлений? последовательно без одновременного обновления таблицы?
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dwingestion-b033d9535e9d.json"
# GCP project and table configurations
PROJECT_ID = 'dwingestion'
DATASET_ID = 'api_staging'
STAGING_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_data_streaming'
LIVE_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_latest_data'
# SQL query to fetch data from the staging table
STAGING_QUERY = f"""
SELECT *
FROM `{STAGING_TABLE}`
"""
class MergeStagingToLiveTable(beam.DoFn):
"""
Perform the MERGE operation from the staging table to the live table.
"""
def __init__(self, project_id, staging_table, live_table):
self.project_id = project_id
self.staging_table = staging_table
self.live_table = live_table
def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)
def process(self, element):
# Perform the MERGE query
merge_query = """
MERGE `dwingestion.api_staging.apache_beam_bms_latest_data` AS live USING (
SELECT
#...the rest of the query """
query_job = self.bq_client.query(merge_query)
query_job.result()
yield "MERGE query completed."
class CreateOrReplaceLiveTable(beam.DoFn):
"""
Perform the CREATE OR REPLACE operation on the live table.
"""
def __init__(self, project_id, live_table, final_table):
self.project_id = project_id
self.live_table = live_table
self.final_table = final_table
def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)
def process(self, element):
# Perform the CREATE OR REPLACE query
create_replace_query = """
CREATE OR REPLACE TABLE `dwingestion.api.apache_beam_bms_live` AS
WITH with_country AS (
SELECT
* ,
'UG' AS country
FROM
`dwingestion.api_staging.apache_beam_bms_latest_data`
),
# ...the rest of the query
"""
query_job = self.bq_client.query(create_replace_query)
query_job.result()
def run_pipeline():
"""
Main function to set up and execute the pipeline.
"""
pipeline_options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://bodawork_dataflow_template/temp',
staging_location='gs://bodawork_dataflow_template/staging',
region='europe-west1',
job_name='flesp-upsert-streaming-pipeline-dataflow',
save_main_session=True
)
pipeline_options.view_as(StandardOptions).streaming = True
# Apache Beam pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read from Staging Table' >> beam.io.ReadFromBigQuery(query=STAGING_QUERY,
use_standard_sql=True,
method="DIRECT_READ")
# Step 1: Windowing before the first ParDo
| 'Window for Merge' >> beam.WindowInto(beam.window.FixedWindows(60)) # 60-second window
| 'Trigger Merge' >> beam.ParDo(MergeStagingToLiveTable(
project_id=PROJECT_ID,
staging_table=STAGING_TABLE,
live_table=LIVE_TABLE
))
)
with beam.Pipeline() as pipeline_2:
(
pipeline_2
| 'Window for Create/Replace' >> beam.WindowInto(beam.window.FixedWindows(300))
| 'Trigger Create/Replace' >> beam.ParDo(CreateOrReplaceLiveTable(
project_id=PROJECT_ID,
live_table=LIVE_TABLE,
final_table='dwingestion.api.apache_beam_bms_live'
))
)
if __name__ == "__main__":
run_pipeline()
Подробнее здесь: https://stackoverflow.com/questions/792 ... scripts-in
Как лучше всего избежать одновременного обновления таблицы при запуске сценариев Bigquery в лучевом конвейере Apache? ⇐ Python
Программы на Python
1733407925
Anonymous
Здравствуйте, я пытаюсь изучить и изучить луч Apache.
Ниже приведен мой сценарий луча Apache, который запускает сценарии, зависящие от последовательности обновлений. Я получаю следующую ошибку от bigquery
[b]Не удалось сериализовать доступ к таблице dwingestion:api_staging.apache_beam_bms_latest_data из-за одновременного обновления[/b]
Я пробовал разные способы, включая расширение окна, а также разделение обновлений на разные конвейеры, но ошибка все равно сохраняется. Есть ли способ сделать скрипт более стабильным для запуска обновлений? последовательно без одновременного обновления таблицы?
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from google.cloud import bigquery
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "dwingestion-b033d9535e9d.json"
# GCP project and table configurations
PROJECT_ID = 'dwingestion'
DATASET_ID = 'api_staging'
STAGING_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_data_streaming'
LIVE_TABLE = f'{PROJECT_ID}.{DATASET_ID}.apache_beam_bms_latest_data'
# SQL query to fetch data from the staging table
STAGING_QUERY = f"""
SELECT *
FROM `{STAGING_TABLE}`
"""
class MergeStagingToLiveTable(beam.DoFn):
"""
Perform the MERGE operation from the staging table to the live table.
"""
def __init__(self, project_id, staging_table, live_table):
self.project_id = project_id
self.staging_table = staging_table
self.live_table = live_table
def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)
def process(self, element):
# Perform the MERGE query
merge_query = """
MERGE `dwingestion.api_staging.apache_beam_bms_latest_data` AS live USING (
SELECT
#...the rest of the query """
query_job = self.bq_client.query(merge_query)
query_job.result()
yield "MERGE query completed."
class CreateOrReplaceLiveTable(beam.DoFn):
"""
Perform the CREATE OR REPLACE operation on the live table.
"""
def __init__(self, project_id, live_table, final_table):
self.project_id = project_id
self.live_table = live_table
self.final_table = final_table
def setup(self):
self.bq_client = bigquery.Client(project=self.project_id)
def process(self, element):
# Perform the CREATE OR REPLACE query
create_replace_query = """
CREATE OR REPLACE TABLE `dwingestion.api.apache_beam_bms_live` AS
WITH with_country AS (
SELECT
* ,
'UG' AS country
FROM
`dwingestion.api_staging.apache_beam_bms_latest_data`
),
# ...the rest of the query
"""
query_job = self.bq_client.query(create_replace_query)
query_job.result()
def run_pipeline():
"""
Main function to set up and execute the pipeline.
"""
pipeline_options = PipelineOptions(
project='dwingestion',
runner='DataflowRunner',
streaming=True,
temp_location='gs://bodawork_dataflow_template/temp',
staging_location='gs://bodawork_dataflow_template/staging',
region='europe-west1',
job_name='flesp-upsert-streaming-pipeline-dataflow',
save_main_session=True
)
pipeline_options.view_as(StandardOptions).streaming = True
# Apache Beam pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Read from Staging Table' >> beam.io.ReadFromBigQuery(query=STAGING_QUERY,
use_standard_sql=True,
method="DIRECT_READ")
# Step 1: Windowing before the first ParDo
| 'Window for Merge' >> beam.WindowInto(beam.window.FixedWindows(60)) # 60-second window
| 'Trigger Merge' >> beam.ParDo(MergeStagingToLiveTable(
project_id=PROJECT_ID,
staging_table=STAGING_TABLE,
live_table=LIVE_TABLE
))
)
with beam.Pipeline() as pipeline_2:
(
pipeline_2
| 'Window for Create/Replace' >> beam.WindowInto(beam.window.FixedWindows(300))
| 'Trigger Create/Replace' >> beam.ParDo(CreateOrReplaceLiveTable(
project_id=PROJECT_ID,
live_table=LIVE_TABLE,
final_table='dwingestion.api.apache_beam_bms_live'
))
)
if __name__ == "__main__":
run_pipeline()
Подробнее здесь: [url]https://stackoverflow.com/questions/79255033/how-best-to-avoid-concurrent-update-on-a-table-when-running-bigquery-scripts-in[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия