Cx_oracle ThreadPoolExecutor с SessionPool и распределением нагрузки для каждого потокаPython

Программы на Python
Ответить
Anonymous
 Cx_oracle ThreadPoolExecutor с SessionPool и распределением нагрузки для каждого потока

Сообщение Anonymous »

У меня есть вариант использования, который заключается в загрузке огромных таблиц из Oracle в Snowflake.
Сервер Oracle расположен далеко от конечной точки Snowflake, поэтому у нас возникают проблемы с подключением при загрузке таблиц (фактически представлений) размером более 12. ГБ с помощью сценария буферизации или cx_oracle.
Я думал об использовании ThreadPoolExecutor с максимум 4 потоками для тестирования и использования SessionPool. При этом я получаю соединение на поток, в этом весь смысл. Итак, это означает, что мне придется распределять выборку данных по пакетам для каждого потока.
Мой вопрос: как я могу этого добиться? Правильно ли сделать что-то вроде:
"Выберите * из таблицы, где число строк между x и y" (я знаю, это не этот синтаксис... но вы поняли мою точку зрения), следует ли мне полагаться на OFFSET,... ?
Моя идея заключалась в том, что каждый поток получает «кусок» select , извлекает данные пакетами и записывает строки в csv также пакетно, потому что я предпочитаю небольшие файлы, а не огромные файл, чтобы отправить снежинка.

Код: Выделить всё

def query(start_off, pool):
start_conn = datetime.now()
con = pool.acquire()
end_conn = datetime.now()
print(f"Conn/Acquire time: {end_conn-start_conn}")

with con.cursor() as cur:
start_exec_ts = datetime.now()
cur.execute(QUERY, start_pos=start_off, end_pos=start_off+(OFFSET-1))
end_exec_ts = datetime.now()
rows = cur.fetchall()
end_fetch_ts = datetime.now()
total_exec_ts = end_exec_ts-start_exec_ts
total_fetch_ts = end_fetch_ts-end_exec_ts
print(f"Exec time : {total_exec_ts}")
print(f"Fetch time : {total_fetch_ts}")
print(f"Task executed {threading.current_thread().getName()}, {threading.get_ident()}")
return rows

def main():
pool = cx_Oracle.SessionPool(c.oracle_conn['oracle']['username'],
c.oracle_conn['oracle']['password'],
c.oracle_conn['oracle']['dsn'],
min=2, max=4, increment=1,
threaded=True,
getmode=cx_Oracle.SPOOL_ATTRVAL_WAIT
)

with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(query, d, pool) for d in range(1,13,OFFSET)]

for future in as_completed(futures):
# process your records from each thread
print(repr(future.result()))
# process_records(future.result())
if __name__ == '__main__':
main()
Кроме того, используя fetchMany в функции запроса, как я могу отправлять результаты обратно, чтобы иметь возможность обрабатывать их каждый раз?

Подробнее здесь: https://stackoverflow.com/questions/689 ... or-each-th
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»