Я пытаюсь создать конвейер для чтения из BigQuery и записи в таблицу MS SQL. Чтение из фазы BigQuery в порядке, но запись в MS SQL через WriteToJdbc невозможна из-за схемы.
Я хочу выполнить запрос, поэтому не могу использовать ReadFromBigQuery(table=".. .", output_type='BEAM_ROW'). Даже если я могу использовать BEAM_ROW, конвейеру требуется схема после шага beam.Partition.
Я получал сообщение «Невозможно вызвать getSchema, когда схемы нет» ошибка перед использованием with_output_types.
Все еще работает неправильно. Итак, теперь я получаю сообщение об ошибке NameError: имя 'Gdk' не определено.
Я исследовал документацию Beam Schema, моя конечная цель - сделать мой конвейер динамическим, поэтому я не хочу использовать Select для получения схема.
Также я попробовал Row(), но получил ошибку NameError: имя «Строка» не определено. Вы имели в виду: «pow»?
Попробовал много вещей, но все еще не могу использовать код для передачи данных из BQ в MSSQL. Что мне делать?
Мой полный код:
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
import random
from typing import NamedTuple
from apache_beam import Row # Also tried from apache_beam.pvalue import Row
def create_pipeline_options(project, region, job_name, temp_location, staging_location):
options = PipelineOptions() # GoogleCloudOptions, WorkerOptions
return options
class Gdk(NamedTuple): # Also tried TypedDict
column1: int
column2: int
column3: float
column4: str
def run_pipeline():
project = "dwh-dev"
region = "europe-west3"
job_name = f"pipeline2o{random.randint(1, 100)}"
temp_location = "gs://dataflow-dev..."
staging_location = "gs://dataflow-dev..."
# JDBC connection details
jdbc_table = "table_name"
jdbc_driver_class = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_url = "jdbc:sqlserver://0.0.0.0:1433..."
jdbc_user = "user"
jdbc_password = "password"
# Create the pipeline
options = create_pipeline_options(project, region, job_name, temp_location, staging_location)
p = beam.Pipeline(options=options)
beam.coders.registry.register_coder(Gdk, beam.coders.RowCoder)
# BigQuery query to read data
query = "SELECT column1, column2, column3, column4 FROM `dwh-dev.datamart.vw_stocksales`"
def mod_fn(x, ln):
return x['column1'] % 4
def gdk_to_dict(gdk) -> dict: # Also tried out of the run_pipeline function
return {
"column1": gdk["column1"],
"column2": gdk["column2"],
"column3": gdk["column3"],
"column4": gdk["column4"]
}
partitions = (
p | "ReadFromBigQuery" >> ReadFromBigQuery(query=query, use_standard_sql=True)
| "PartitionData" >> beam.Partition(mod_fn, 4)
)
for i, partition in enumerate(partitions):
(
partition
| f"ConvertToRow-{i}" >> beam.Map(lambda data: Gdk(column1=data["column1"], # Also tried gdk_to_dict function to convert Gdk to dict
column2=data["column2"],
column3=data["column3"],
column4=data["column4"])).with_output_types(Gdk)
# This returns error: NameError: name 'Row' is not defined. Did you mean: 'pow'?
# | f"ConvertToRow-{i}" >> beam.Map(lambda data: Row(column1=data["column1"],
# column2=data["column2"],
# column3=data["column3"],
# column4=data["column4"])).with_output_types(Gdk)
| f"WriteToJdbc-{i}" >> WriteToJdbc(
table_name=jdbc_table,
driver_class_name=jdbc_driver_class,
jdbc_url=jdbc_url,
username=jdbc_user,
password=jdbc_password,
)
)
p.run()
if __name__ == "__main__":
run_pipeline()
Подробнее здесь: https://stackoverflow.com/questions/793 ... python-sdk
Apache Beam, создание схемы для WriteToJdbc в Python SDK ⇐ Python
Программы на Python
-
Anonymous
1737671511
Anonymous
Я пытаюсь создать конвейер для чтения из BigQuery и записи в таблицу MS SQL. Чтение из фазы BigQuery в порядке, но запись в MS SQL через WriteToJdbc невозможна из-за схемы.
Я хочу выполнить запрос, поэтому не могу использовать ReadFromBigQuery(table=".. .", output_type='BEAM_ROW'). Даже если я могу использовать BEAM_ROW, конвейеру требуется схема после шага beam.Partition.
Я получал сообщение «Невозможно вызвать getSchema, когда схемы нет» ошибка перед использованием with_output_types.
Все еще работает неправильно. Итак, теперь я получаю сообщение об ошибке NameError: имя 'Gdk' не определено.
Я исследовал документацию Beam Schema, моя конечная цель - сделать мой конвейер динамическим, поэтому я не хочу использовать Select для получения схема.
Также я попробовал Row(), но получил ошибку NameError: имя «Строка» не определено. Вы имели в виду: «pow»?
Попробовал много вещей, но все еще не могу использовать код для передачи данных из BQ в MSSQL. Что мне делать?
Мой полный код:
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery
from apache_beam.io.jdbc import WriteToJdbc
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, StandardOptions, WorkerOptions
import random
from typing import NamedTuple
from apache_beam import Row # Also tried from apache_beam.pvalue import Row
def create_pipeline_options(project, region, job_name, temp_location, staging_location):
options = PipelineOptions() # GoogleCloudOptions, WorkerOptions
return options
class Gdk(NamedTuple): # Also tried TypedDict
column1: int
column2: int
column3: float
column4: str
def run_pipeline():
project = "dwh-dev"
region = "europe-west3"
job_name = f"pipeline2o{random.randint(1, 100)}"
temp_location = "gs://dataflow-dev..."
staging_location = "gs://dataflow-dev..."
# JDBC connection details
jdbc_table = "table_name"
jdbc_driver_class = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_url = "jdbc:sqlserver://0.0.0.0:1433..."
jdbc_user = "user"
jdbc_password = "password"
# Create the pipeline
options = create_pipeline_options(project, region, job_name, temp_location, staging_location)
p = beam.Pipeline(options=options)
beam.coders.registry.register_coder(Gdk, beam.coders.RowCoder)
# BigQuery query to read data
query = "SELECT column1, column2, column3, column4 FROM `dwh-dev.datamart.vw_stocksales`"
def mod_fn(x, ln):
return x['column1'] % 4
def gdk_to_dict(gdk) -> dict: # Also tried out of the run_pipeline function
return {
"column1": gdk["column1"],
"column2": gdk["column2"],
"column3": gdk["column3"],
"column4": gdk["column4"]
}
partitions = (
p | "ReadFromBigQuery" >> ReadFromBigQuery(query=query, use_standard_sql=True)
| "PartitionData" >> beam.Partition(mod_fn, 4)
)
for i, partition in enumerate(partitions):
(
partition
| f"ConvertToRow-{i}" >> beam.Map(lambda data: Gdk(column1=data["column1"], # Also tried gdk_to_dict function to convert Gdk to dict
column2=data["column2"],
column3=data["column3"],
column4=data["column4"])).with_output_types(Gdk)
# This returns error: NameError: name 'Row' is not defined. Did you mean: 'pow'?
# | f"ConvertToRow-{i}" >> beam.Map(lambda data: Row(column1=data["column1"],
# column2=data["column2"],
# column3=data["column3"],
# column4=data["column4"])).with_output_types(Gdk)
| f"WriteToJdbc-{i}" >> WriteToJdbc(
table_name=jdbc_table,
driver_class_name=jdbc_driver_class,
jdbc_url=jdbc_url,
username=jdbc_user,
password=jdbc_password,
)
)
p.run()
if __name__ == "__main__":
run_pipeline()
Подробнее здесь: [url]https://stackoverflow.com/questions/79382761/apache-beam-creating-schema-for-writetojdbc-in-python-sdk[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия