Сервер Oracle расположен далеко от конечной точки Snowflake, поэтому у нас возникают проблемы с подключением при загрузке таблиц (фактически представлений) размером более 12. ГБ с помощью сценария буферизации или cx_oracle.
Я думал об использовании ThreadPoolExecutor с максимум 4 потоками для тестирования и использования SessionPool. При этом я получаю соединение на поток, в этом весь смысл. Итак, это означает, что мне придется распределять выборку данных по пакетам для каждого потока.
Мой вопрос: как я могу этого добиться? Правильно ли сделать что-то вроде:
"Выберите * из таблицы, где число строк между x и y" (я знаю, это не этот синтаксис... но вы поняли мою точку зрения), следует ли мне полагаться на OFFSET,... ?
Моя идея заключалась в том, что каждый поток получает «кусок» select , извлекает данные пакетами и записывает строки в csv также пакетно, потому что я предпочитаю небольшие файлы, а не огромные файл, чтобы отправить снежинка.
Код: Выделить всё
def query(start_off, pool):
start_conn = datetime.now()
con = pool.acquire()
end_conn = datetime.now()
print(f"Conn/Acquire time: {end_conn-start_conn}")
with con.cursor() as cur:
start_exec_ts = datetime.now()
cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
end_exec_ts = datetime.now()
rows = cur.fetchall()
end_fetch_ts = datetime.now()
total_exec_ts = end_exec_ts-start_exec_ts
total_fetch_ts = end_fetch_ts-end_exec_ts
print(f"Exec time : {total_exec_ts}")
print(f"Fetch time : {total_fetch_ts}")
print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
return rows
def main():
pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
c.oracle_conn['oracle']['password'],
c.oracle_conn['oracle']['dsn'],
min=2, max=4, increment=1,
threaded=True,
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
)
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]
for future in as_completed(futures):
# process your records from each thread
print(repr(future.result()))
# process_records(future.result())
if __name__ == '__main__':
main()
Подробнее здесь: https://stackoverflow.com/questions/689 ... or-each-th
Мобильная версия