Самый эффективный способ передачи больших строк из postgreSQL в Azure SQL.Python

Программы на Python
Ответить
Anonymous
 Самый эффективный способ передачи больших строк из postgreSQL в Azure SQL.

Сообщение Anonymous »

Сейчас я работаю над проектом, в котором мне нужно перенести большой объем данных (около 37 миллионов записей) из postgreSQL в AzureSQL. Один столбец содержит полезную нагрузку JSON, хранящуюся как jsonb. Эти полезные данные могут иметь длину более 4000 символов.
Есть и вторая проблема: мне нужно проанализировать полезные данные и извлечь из них определенные элементы. Поэтому происходит небольшая трансформация.
Мне удалось придумать сценарий, который способен мигрировать со скоростью примерно 300 записей в секунду, но это все равно занимает у меня часы. перенести это. Также я заметил, что DTU моего AzureSQL уже работают на 100%.
Поэтому сейчас я пытаюсь использовать Execute_many в своем скрипте. Я ожидал, что этот скрипт будет работать с приличной скоростью, поскольку я переношу только две строки, и на опыте я достиг очень высокой производительности с другими скриптами и excumte_many/insert как можно большим количеством подходов (до 10 КБ в секунду). В результате, как уже упоминалось выше, я выходил примерно на 240-300 входов в секунду. Итак, скрипт работает сам по себе, но он СЛИШКОМ МЕДЛЕН. Миграция занимает примерно более 30 часов, а это слишком долго.
Мне интересно, существует ли более эффективный способ массовой вставки этих данных. Ниже я предоставил скрипт. В этом скрипте я не занимаюсь синтаксическим анализом, потому что сначала хочу решить самую сложную задачу, а именно перенести огромную задницу с приличной скоростью.
Есть ли подходы получше, например использовать фрейм данных? Используя даже другую технологию? Или вы считаете, что сценарий уже эффективен, а ограничивающим фактором является DTU моего AzureSQL?
Моя цель – добиться более высокой производительности, то есть БОЛЬШЕ вставок в секунду, поэтому Вся миграция происходит в разумные сроки.
import time
import psycopg2
import pyodbc
import config

# Configuration
# PostgreSQL connection details
# Azure SQL connection details
# Source and target info

# Number of rows per batch
BATCH_SIZE = 1000

COLUMNS = ["id", "payload"]

# ------------------------------------------------------------------
# PostgreSQL: Connect and create server-side cursor
# ------------------------------------------------------------------
def get_postgres_cursor():
conn = psycopg2.connect(
host=PG_HOST,
port=PG_PORT,
dbname=PG_DBNAME,
user=PG_USER,
password=PG_PASSWORD
)
conn.autocommit = False
cursor = conn.cursor(name="statement_cursor")
cursor.execute(f"""
SELECT
id,
payload::text AS payload
FROM {PG_SCHEMA}.{PG_TABLE}
ORDER BY statement_id
""")
return conn, cursor

# ------------------------------------------------------------------
# Azure SQL: Connect (pyodbc)
# ------------------------------------------------------------------
def get_azure_cursor():
conn_str = (
"DRIVER={ODBC Driver 18 for SQL Server};"
f"SERVER={AZURESQL_HOST};"
f"DATABASE={AZURESQL_DBNAME};"
f"UID={AZURESQL_USER};"
f"PWD={AZURESQL_PASSWORD};"
)
conn = pyodbc.connect(conn_str, autocommit=False)
cursor = conn.cursor()
cursor.fast_executemany = True

# Handle large string columns
cursor.setinputsizes([
(pyodbc.SQL_WVARCHAR, 255, 0), # 'id' with max length 255
(pyodbc.SQL_WVARCHAR, 0, 0) # 'payload' as large text
])
return conn, cursor

# ------------------------------------------------------------------
# Migration Function
# ------------------------------------------------------------------
def migrate_data():
pg_conn, pg_cursor = get_postgres_cursor()
azure_conn, azure_cursor = get_azure_cursor()

# Build the INSERT statement
column_list = ", ".join(COLUMNS)
placeholders = ", ".join(["?"] * len(COLUMNS))
insert_sql = f"INSERT INTO {AZURE_SCHEMA}.{AZURE_TABLE} ({column_list}) VALUES ({placeholders})"

total_migrated = 0
start_time = time.time()

print("Starting migration...")

try:
while True:
# Fetch next batch from PostgreSQL
rows = pg_cursor.fetchmany(BATCH_SIZE)
if not rows:
break # no more data

# Convert rows to list of tuples
insert_data = list(rows)

try:
azure_cursor.executemany(insert_sql, insert_data)
azure_conn.commit()
except pyodbc.Error as ex:
azure_conn.rollback()
print(f"Error in batch insert: {ex}")
# Optionally, implement retry logic or log the failed batch

# Report progress
total_migrated += len(insert_data)
elapsed = time.time() - start_time
speed = total_migrated / (elapsed if elapsed else 1.0)
print(f"Migrated {total_migrated} rows at ~{speed:.1f} rows/sec")

except Exception as e:
print(f"An error occurred during migration: {e}")

finally:
# Cleanup PostgreSQL
pg_cursor.close()
pg_conn.commit()
pg_conn.close()

# Cleanup Azure SQL
azure_cursor.close()
azure_conn.close()

print(f"Done! Total rows migrated: {total_migrated}.")

if __name__ == "__main__":
migrate_data()


Подробнее здесь: https://stackoverflow.com/questions/793 ... o-azuresql
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»