Сейчас я работаю над проектом, в котором мне нужно перенести большой объем данных (около 37 миллионов записей) из postgreSQL в AzureSQL. Один столбец содержит полезную нагрузку JSON, хранящуюся как jsonb. Эти полезные данные могут иметь длину более 4000 символов.
Есть и вторая проблема: мне нужно проанализировать полезные данные и извлечь из них определенные элементы. Поэтому происходит небольшая трансформация.
Мне удалось придумать сценарий, который способен мигрировать со скоростью примерно 300 записей в секунду, но это все равно занимает у меня часы. перенести это. Также я заметил, что DTU моего AzureSQL уже работают на 100%.
Поэтому сейчас я пытаюсь использовать Execute_many в своем скрипте. Я ожидал, что этот скрипт будет работать с приличной скоростью, поскольку я переношу только две строки, и на опыте я достиг очень высокой производительности с другими скриптами и excumte_many/insert как можно большим количеством подходов (до 10 КБ в секунду). В результате, как уже упоминалось выше, я выходил примерно на 240-300 входов в секунду. Итак, скрипт работает сам по себе, но он СЛИШКОМ МЕДЛЕН. Миграция занимает примерно более 30 часов, а это слишком долго.
Мне интересно, существует ли более эффективный способ массовой вставки этих данных. Ниже я предоставил скрипт. В этом скрипте я не занимаюсь синтаксическим анализом, потому что сначала хочу решить самую сложную задачу, а именно перенести огромную задницу с приличной скоростью.
Есть ли подходы получше, например использовать фрейм данных? Используя даже другую технологию? Или вы считаете, что сценарий уже эффективен, а ограничивающим фактором является DTU моего AzureSQL?
Моя цель – добиться более высокой производительности, то есть БОЛЬШЕ вставок в секунду, поэтому Вся миграция происходит в разумные сроки.
import time
import psycopg2
import pyodbc
import config
# Configuration
# PostgreSQL connection details
# Azure SQL connection details
# Source and target info
# Number of rows per batch
BATCH_SIZE = 1000
COLUMNS = ["id", "payload"]
# ------------------------------------------------------------------
# PostgreSQL: Connect and create server-side cursor
# ------------------------------------------------------------------
def get_postgres_cursor():
conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DBNAME,
user=PG_USER,
password=PG_PASSWORD
)
conn.autocommit = False
cursor = conn.cursor(name="statement_cursor")
cursor.execute(f"""
SELECT
id,
payload::text AS payload
FROM {PG_SCHEMA}.{PG_TABLE}
ORDER BY statement_id
""")
return conn, cursor
# ------------------------------------------------------------------
# Azure SQL: Connect (pyodbc)
# ------------------------------------------------------------------
def get_azure_cursor():
conn_str = (
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={AZURESQL_HOST};"
f"DATABASE={AZURESQL_DBNAME};"
f"UID={AZURESQL_USER};"
f"PWD={AZURESQL_PASSWORD};"
)
conn = pyodbc.connect(conn_str, autocommit=False)
cursor = conn.cursor()
cursor.fast_executemany = True
# Handle large string columns
cursor.setinputsizes([
(pyodbc.SQL_WVARCHAR, 255, 0), # 'id' with max length 255
(pyodbc.SQL_WVARCHAR, 0, 0) # 'payload' as large text
])
return conn, cursor
# ------------------------------------------------------------------
# Migration Function
# ------------------------------------------------------------------
def migrate_data():
pg_conn, pg_cursor = get_postgres_cursor()
azure_conn, azure_cursor = get_azure_cursor()
# Build the INSERT statement
column_list = ", ".join(COLUMNS)
placeholders = ", ".join(["?"] * len(COLUMNS))
insert_sql = f"INSERT INTO {AZURE_SCHEMA}.{AZURE_TABLE} ({column_list}) VALUES ({placeholders})"
total_migrated = 0
start_time = time.time()
print("Starting migration...")
try:
while True:
# Fetch next batch from PostgreSQL
rows = pg_cursor.fetchmany(BATCH_SIZE)
if not rows:
break # no more data
# Convert rows to list of tuples
insert_data = list(rows)
try:
azure_cursor.executemany(insert_sql, insert_data)
azure_conn.commit()
except pyodbc.Error as ex:
azure_conn.rollback()
print(f"Error in batch insert: {ex}")
# Optionally, implement retry logic or log the failed batch
# Report progress
total_migrated += len(insert_data)
elapsed = time.time() - start_time
speed = total_migrated / (elapsed if elapsed else 1.0)
print(f"Migrated {total_migrated} rows at ~{speed:.1f} rows/sec")
except Exception as e:
print(f"An error occurred during migration: {e}")
finally:
# Cleanup PostgreSQL
pg_cursor.close()
pg_conn.commit()
pg_conn.close()
# Cleanup Azure SQL
azure_cursor.close()
azure_conn.close()
print(f"Done! Total rows migrated: {total_migrated}.")
if __name__ == "__main__":
migrate_data()
Подробнее здесь: https://stackoverflow.com/questions/793 ... o-azuresql
Самый эффективный способ передачи больших строк из postgreSQL в Azure SQL. ⇐ Python
Программы на Python
1737939400
Anonymous
Сейчас я работаю над проектом, в котором мне нужно перенести большой объем данных (около 37 миллионов записей) из postgreSQL в AzureSQL. Один столбец содержит полезную нагрузку JSON, хранящуюся как jsonb. Эти полезные данные могут иметь длину более 4000 символов.
Есть и вторая проблема: мне нужно проанализировать полезные данные и извлечь из них определенные элементы. Поэтому происходит небольшая трансформация.
Мне удалось придумать сценарий, который способен мигрировать со скоростью примерно 300 записей в секунду, но это все равно занимает у меня часы. перенести это. Также я заметил, что DTU моего AzureSQL уже работают на 100%.
Поэтому сейчас я пытаюсь использовать Execute_many в своем скрипте. Я ожидал, что этот скрипт будет работать с приличной скоростью, поскольку я переношу только две строки, и на опыте я достиг очень высокой производительности с другими скриптами и excumte_many/insert как можно большим количеством подходов (до 10 КБ в секунду). В результате, как уже упоминалось выше, я выходил примерно на 240-300 входов в секунду. Итак, скрипт работает сам по себе, но он [b]СЛИШКОМ МЕДЛЕН. Миграция занимает примерно более 30 часов, а это слишком долго.[/b]
Мне интересно, существует ли более эффективный способ массовой вставки этих данных. Ниже я предоставил скрипт. В этом скрипте я не занимаюсь синтаксическим анализом, потому что сначала хочу решить самую сложную задачу, а именно перенести огромную задницу с приличной скоростью.
Есть ли подходы получше, например использовать фрейм данных? Используя даже другую технологию? Или вы считаете, что сценарий уже эффективен, а ограничивающим фактором является DTU моего AzureSQL?
Моя цель – добиться более высокой производительности, то есть БОЛЬШЕ вставок в секунду, поэтому Вся миграция происходит в разумные сроки.
import time
import psycopg2
import pyodbc
import config
# Configuration
# PostgreSQL connection details
# Azure SQL connection details
# Source and target info
# Number of rows per batch
BATCH_SIZE = 1000
COLUMNS = ["id", "payload"]
# ------------------------------------------------------------------
# PostgreSQL: Connect and create server-side cursor
# ------------------------------------------------------------------
def get_postgres_cursor():
conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DBNAME,
user=PG_USER,
password=PG_PASSWORD
)
conn.autocommit = False
cursor = conn.cursor(name="statement_cursor")
cursor.execute(f"""
SELECT
id,
payload::text AS payload
FROM {PG_SCHEMA}.{PG_TABLE}
ORDER BY statement_id
""")
return conn, cursor
# ------------------------------------------------------------------
# Azure SQL: Connect (pyodbc)
# ------------------------------------------------------------------
def get_azure_cursor():
conn_str = (
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={AZURESQL_HOST};"
f"DATABASE={AZURESQL_DBNAME};"
f"UID={AZURESQL_USER};"
f"PWD={AZURESQL_PASSWORD};"
)
conn = pyodbc.connect(conn_str, autocommit=False)
cursor = conn.cursor()
cursor.fast_executemany = True
# Handle large string columns
cursor.setinputsizes([
(pyodbc.SQL_WVARCHAR, 255, 0), # 'id' with max length 255
(pyodbc.SQL_WVARCHAR, 0, 0) # 'payload' as large text
])
return conn, cursor
# ------------------------------------------------------------------
# Migration Function
# ------------------------------------------------------------------
def migrate_data():
pg_conn, pg_cursor = get_postgres_cursor()
azure_conn, azure_cursor = get_azure_cursor()
# Build the INSERT statement
column_list = ", ".join(COLUMNS)
placeholders = ", ".join(["?"] * len(COLUMNS))
insert_sql = f"INSERT INTO {AZURE_SCHEMA}.{AZURE_TABLE} ({column_list}) VALUES ({placeholders})"
total_migrated = 0
start_time = time.time()
print("Starting migration...")
try:
while True:
# Fetch next batch from PostgreSQL
rows = pg_cursor.fetchmany(BATCH_SIZE)
if not rows:
break # no more data
# Convert rows to list of tuples
insert_data = list(rows)
try:
azure_cursor.executemany(insert_sql, insert_data)
azure_conn.commit()
except pyodbc.Error as ex:
azure_conn.rollback()
print(f"Error in batch insert: {ex}")
# Optionally, implement retry logic or log the failed batch
# Report progress
total_migrated += len(insert_data)
elapsed = time.time() - start_time
speed = total_migrated / (elapsed if elapsed else 1.0)
print(f"Migrated {total_migrated} rows at ~{speed:.1f} rows/sec")
except Exception as e:
print(f"An error occurred during migration: {e}")
finally:
# Cleanup PostgreSQL
pg_cursor.close()
pg_conn.commit()
pg_conn.close()
# Cleanup Azure SQL
azure_cursor.close()
azure_conn.close()
print(f"Done! Total rows migrated: {total_migrated}.")
if __name__ == "__main__":
migrate_data()
Подробнее здесь: [url]https://stackoverflow.com/questions/79389561/most-efficient-way-to-transfer-large-strings-from-postgresql-to-azuresql[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия