Медленная миграция данных из Snowflake в MySQL в Python с использованием SQlAlchemyMySql

Форум по Mysql
Ответить Пред. темаСлед. тема
Anonymous
 Медленная миграция данных из Snowflake в MySQL в Python с использованием SQlAlchemy

Сообщение Anonymous »

Итак, у меня есть большой объем данных в Snowflake, копию которых я хотел бы сохранить на имеющемся у меня сервере MySQL. Я создал этот сценарий. Я просто хочу сохранить копию данных в MySQL не для использования в разработке или производстве, а просто сохранить копию.

Код: Выделить всё

from sqlalchemy import create_engine
from sqlalchemy import text
import pandas as pd
import time

snowflake_engine = create_engine(
'snowflake://{user}:{password}@{account}/{database_name}/{schema_name}?warehouse={warehouse_name}'.format(
user='XXXXXX',
password='XXXXXX',
account='XXXX-XXXXX',
warehouse_name='WAREHOUSE',
database_name='XXXXX',
schema_name='XXXXX'
)
)

mysql_engine = create_engine('mysql+mysqlconnector://XXXXX:[email protected]:3306/XXXXXXX')

schema = 'XXXXXXX'
table_name = ''

# Fetch data in chunks and append to MySQL
chunk_size = 2500
try:
snowflake_connection = snowflake_engine.connect()
mysql_connection = mysql_engine.connect()

# Query to fetch table names
query = f"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA='{schema}'"
print(f"Fetching table names from schema: {schema}...")
tables_df = pd.read_sql(text(query), snowflake_connection)
total_tables = len(tables_df)

# Iterate through each table
for index, row in tables_df.iterrows():
table_name = row['table_name']
print(f"Fetching data from table: {table_name}...")

#fetch entire table data in chunks
offset = 0
while True:
#fetch the chunk of data
table_query = f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {offset}"
df = pd.read_sql(text(table_query), snowflake_connection)

if not df.empty:
# Save the dataframe to MySQL database in chunks
df.to_sql(table_name, con=mysql_engine, if_exists='append', index=False)
print(f"Processed chunk for table {table_name}, offset {offset}")

# Move the offset to fetch the next chunk
offset += chunk_size
else:
break  # Exit the loop when no more rows are returned

print(f"Table {index+1} of {total_tables} has been processed")

finally:
snowflake_connection.close()
snowflake_engine.dispose()
mysql_connection.close()
mysql_engine.dispose()
Это работает. Проблема в том, что передача данных происходит очень медленно. Обработка одной партии занимает 5 минут и более. Перед добавлением пакетных запросов я получал эту ошибку, и сценарий закрывался. Теперь я получаю это после того, как скрипт работал целый день:

Код: Выделить всё

sqlalchemy.exc.ProgrammingError: (snowflake.connector.errors.ProgrammingError) 000629 (57014): Warehouse 'WAREHOUSE' was suspended immediate by resource monitor 'RESOURCEMONITOR', statement aborted.
[SQL: SELECT * FROM XXXXXXXXX LIMIT 2500 OFFSET 1047500]
(Background on this error at: https://sqlalche.me/e/20/f405)
Итак, как мне изменить этот скрипт, чтобы без проблем перенести данные? Пожалуйста, предложите некоторые изменения, которые я могу внести.
Всего существует 115 таблиц, и по крайней мере 40% из них содержат более миллиона строк.

Подробнее здесь: https://stackoverflow.com/questions/793 ... sqlalchemy
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «MySql»