Настройка (на которую я не могу повлиять) такая: :
- 0 графических процессоров
- 4000 процессоров
15,0 Гигабайт памяти
Я начал с фрагментов по 500 000 строк, но это привело к сбою ядра. Пробовал 250.000, результат тот же. Сейчас на 100 000, но все равно происходит сбой.
В соответствии с правилами компании мне нужно выполнить первоначальное подключение к базе данных, как показано ниже, и оно работает:
Код: Выделить всё
# Connection to SQL Server with Kerberos + pyodbc
def mssql_conn_kerberos(server, driver, trusted_connection, trust_server_certificate, kerberos_cmd):
# Run Kerberos for authentifications
os.system(kerberos_cmd)
try:
# First connection attempt
c_conn = pyodbc.connect(
f'DRIVER={driver};'
f'SERVER={server};'
f'Trusted_Connection={trusted_connection};'
f'TrustServerCertificate={trust_server_certificate}'
)
except:
# Re-run Kerberos and try authentification
os.system(kerberos_cmd)
c_conn = pyodbc.connect(
f"DRIVER={driver};"
f"SERVER={server};"
f"Trusted_Connection={trusted_connection};"
f"TrustServerCertificate={trust_server_certificate}"
)
c_cursor = c_conn.cursor()
print("Pyodbc connection ready.")
return c_conn # Connection to the database
Код: Выделить всё
def call_my_query(path_to_query, query_name, chunk, connection):
file_path = os.path.join(path_to_query, query_name)
with open(file_path, "r") as file:
query = file.read()
# SQL processing in chunks + time
chunks = []
start_time = time.time()
for x in pd.read_sql_query(query, connection, chunksize=chunk):
chunks.append(x)
# Concating the chunks - joining all the chunks together
df = pd.concat(chunks, ignore_index=True)
# Process end-time
end_time = time.time()
print("Data loaded successfully!")
print(f'Processed {len(df)} rows in {end_time - start_time:.2f} seconds')
return df
Ядро аварийно завершилось при выполнении кода в текущей или предыдущей ячейке.
Проверьте код в ячейках, чтобы определить возможную причину сбоя.
Нажмите здесь, чтобы получить дополнительную информацию.
Просмотреть Более подробную информацию можно найти в журнале Jupyter.
Я также пытался запустить эту задачу через Dask, изменив функцию call_my_query, но по какой-то причине Dask вызывает проблемы с pyodbc.
Изменение call_my_query для Dask:
Код: Выделить всё
def call_my_query_dask(query_name, chunk, connection, index_col):
# Load query from file
file_path = os.path.join(path_to_query, query_name)
with open(file_path, "r") as file:
query_original = file.read()
# Convert the SQL string/text
query = sqlalchemy.select(query_original)
# Start timing the process
start_time = time.time()
# Use Dask to read the SQL query in chunks
print("Executing query and loading data with Dask...")
df_dask = dd.read_sql_query(
sql=query,
con=connection_url,
npartitions=10,
index_col = index_col
)
# Process end-time
end_time = time.time()
print("Data loaded successfully!")
print(f"Processed approximately {df_dask.shape[0].compute()} rows in {end_time - start_time:.2f} seconds")
return df_dask
Выражение текстового столбца 'SELECT\n\t[COL1]\n\ t, [COL...' должен быть явно объявлен с текстом('SELECT\n\t[COL1]\n\t, [COL...') или использовать literal_column('SELECT\n\t[COL1] \н\т, [COL...') для большей конкретики.
Спасибо всем за помощь.
Подробнее здесь: https://stackoverflow.com/questions/793 ... r-database