Apache Beam, создание схемы для WriteToJdbc в Python SDKPython

Программы на Python
Ответить
Anonymous
 Apache Beam, создание схемы для WriteToJdbc в Python SDK

Сообщение Anonymous »

Я пытаюсь создать конвейер для чтения из BigQuery и записи в таблицу MS SQL. Чтение из фазы BigQuery в порядке, но запись в MS SQL через WriteToJdbc невозможна из-за схемы.
Я хочу выполнить запрос, поэтому не могу использовать ReadFromBigQuery(table=".. .", output_type='BEAM_ROW'). Даже если я могу использовать BEAM_ROW, конвейеру требуется схема после шага beam.Partition.
Я получал сообщение «Невозможно вызвать getSchema, когда схемы нет» ошибка перед использованием with_output_types.
Все еще работает неправильно. Итак, теперь я получаю сообщение об ошибке NameError: имя 'Gdk' не определено.
Я исследовал документацию Beam Schema, моя конечная цель - сделать мой конвейер динамическим, поэтому я не хочу использовать Select для получения схема.
Также я попробовал Row(), но получил ошибку NameError: имя «Строка» не определено. Вы имели в виду: «pow»?
Попробовал много вещей, но все еще не могу использовать код для передачи данных из BQ в MSSQL. Что мне делать?
Мой полный код:
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
import random
from typing import NamedTuple
from apache_beam import Row # Also tried from apache_beam.pvalue import Row

def create_pipeline_options(project, region, job_name, temp_location, staging_location):
options = PipelineOptions() # GoogleCloudOptions, WorkerOptions
return options

class Gdk(NamedTuple): # Also tried TypedDict
column1: int
column2: int
column3: float
column4: str

def run_pipeline():
project = "dwh-dev"
region = "europe-west3"
job_name = f"pipeline2o{random.randint(1, 100)}"
temp_location = "gs://dataflow-dev..."
staging_location = "gs://dataflow-dev..."

# JDBC connection details
jdbc_table = "table_name"
jdbc_driver_class = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_url = "jdbc:sqlserver://0.0.0.0:1433..."
jdbc_user = "user"
jdbc_password = "password"

# Create the pipeline
options = create_pipeline_options(project, region, job_name, temp_location, staging_location)
p = beam.Pipeline(options=options)
beam.coders.registry.register_coder(Gdk, beam.coders.RowCoder)

# BigQuery query to read data
query = "SELECT column1, column2, column3, column4 FROM `dwh-dev.datamart.vw_stocksales`"

def mod_fn(x, ln):
return x['column1'] % 4

def gdk_to_dict(gdk) -> dict: # Also tried out of the run_pipeline function
return {
"column1": gdk["column1"],
"column2": gdk["column2"],
"column3": gdk["column3"],
"column4": gdk["column4"]
}

partitions = (
p | "ReadFromBigQuery" >> ReadFromBigQuery(query=query, use_standard_sql=True)
| "PartitionData" >> beam.Partition(mod_fn, 4)
)

for i, partition in enumerate(partitions):
(
partition
| f"ConvertToRow-{i}" >> beam.Map(lambda data: Gdk(column1=data["column1"], # Also tried gdk_to_dict function to convert Gdk to dict
column2=data["column2"],
column3=data["column3"],
column4=data["column4"])).with_output_types(Gdk)
# This returns error: NameError: name 'Row' is not defined. Did you mean: 'pow'?
# | f"ConvertToRow-{i}" >> beam.Map(lambda data: Row(column1=data["column1"],
# column2=data["column2"],
# column3=data["column3"],
# column4=data["column4"])).with_output_types(Gdk)
| f"WriteToJdbc-{i}" >> WriteToJdbc(
table_name=jdbc_table,
driver_class_name=jdbc_driver_class,
jdbc_url=jdbc_url,
username=jdbc_user,
password=jdbc_password,
)
)

p.run()

if __name__ == "__main__":
run_pipeline()


Подробнее здесь: https://stackoverflow.com/questions/793 ... python-sdk
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»