- Загрузить некоторые данные из Интернета (размером около 5 ГБ)
- Возможно преобразовать некоторые строки/даты и время
- Загрузить в базу данных Postgres
Я попробовал несколько вариантов в поисках повышения производительности. Данные считываются из файла csv. Размер файла — 5 ГБ.
На создание исходной версии моего кода ушло около 90 минут. Это 5120 МБ за 90 минут или 57 МБ/мин, 1 МБ/с. Данные находятся на том же хосте, что и база данных. Между источником данных и приемником нет сетевого соединения, за исключением ядра Linux. (Все осуществляется через локальный хост.)
В первой версии используется следующий код.
Код: Выделить всё
df.to_sql(
name='table',
schema='schema',
con=postgres_engine,
if_exists='replace',
index=False,
)
Код: Выделить всё
chunksize=1000,
Код: Выделить всё
method='multi',
Когда я включил метод ='multi', процесс так и не завершился. Это заняло больше 5 часов, потом я его убил.
Особых улучшений при использовании chunksize я не заметил. Я пробовал значения 1000 и 100000.
psycopg2 действительно предлагает что-то ближе к форме оператора «загрузки» в форме copy_expert. Я обнаружил, что это заняло около 15 минут.
Код: Выделить всё
connection = psycopg2.connect(
host=postgres_host,
user=postgres_user,
password=postgres_password,
dbname=postgres_database,
port=5432,
)
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
cursor = connection.cursor()
cursor.copy_expert(
'COPY schema.table FROM STDIN WITH (FORMAT csv)',
buffer,
)
connection.commit()
Последовательность операций может показаться немного странной. Сначала файл читается с помощью pandas.read_csv. Затем данные записываются в объект StringIO с использованием DataFrame.to_csv. Это выходит за рамки этого вопроса, но мне нужен какой-то метод перевода строки даты и времени из одного (странного) формата в формат ISO, который понимает Postgres/Pandas.
Данные получаются из вызова запросов. Другими словами, загружено с какого-то удаленного сервера.
Кроме того: формат даты и времени
Если вы хотите узнать больше деталей, источник предоставляет данные с датами в следующем формате, который не является ISO и поэтому я предполагаю, что Postgres не читается.
Код: Выделить всё
format='%Y-%m-%d %H:%M'
Похоже, что существует большое количество возможных методов, учитывающих все различия в аргументах, которые могут быть переданы при каждом вызове функции.
В конце концов, все, что мне действительно нужно сделать, это загрузить некоторые данные из Интернет и загрузите его в Postgres, возможно, с некоторыми манипуляциями со строками, чтобы учесть различия в распознаваемых форматах даты и времени.
Какой метод мне следует использовать?
- psycopg (psycopg3) или psycopg2?
- sqlalchemy или нет?
- Какие пакеты нужны? для установки с помощью pip3?
- Должен ли я использовать pandas.to_sql или какой-либо другой метод, например copy_expert или что-то еще?
Я обнаружил, что следующее решение psycopg3 работает хорошо и дает хорошие результаты производительность.
Код: Выделить всё
pip3 install psycopg[binary]
Пример кода приведен ниже. Здесь я предполагаю, что some_data — это объект в байтах, возвращаемый чем-то вроде ответа = request.get, some_data = response.content.
Если вы получаете данные из какого-то другого места, например из локального файла, измените это соответствующим образом.
Обратите также внимание, что вы можете получить повышенную производительность, вообще не используя pandas. В моем конкретном случае мне нужно прочитать данные, изменить их, а затем записать обратно в объект StringIO. Однако вы можете прочитать свои данные непосредственно в объект StringIO, минуя как минимум две дорогостоящие операции. (
Код: Выделить всё
read_csvКод: Выделить всё
import pandas
import psycopg
import io
postgres_connection_string = f'user={}, password={}, host={}, dbname={}, port={}'
df = pandas.read_csv(io.BytesIO(some_data), header=None)
buffer = io.StringIO()
df.to_csv(buffer, index=False, header=False)
buffer.seek(0)
columns = '(column_1, column_2)'
with psycopg.connect(postgres_connection_string) as connection:
with connection.cursor() as cursor:
with cursor.copy(f'COPY some_schema.some_table {columns} FROM STDIN WITH (FORMAT csv)') as copy:
copy.write(file.read())
conn.commit()
Подробнее здесь: https://stackoverflow.com/questions/798 ... stest-meth
Мобильная версия