Прочитайте большой файл из API по частям и запишите каждый фрагмент как файл Parquet, используя PyArrow.Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Прочитайте большой файл из API по частям и запишите каждый фрагмент как файл Parquet, используя PyArrow.

Сообщение Anonymous »

Я экспериментирую с PyArrow, но мне трудно понять некоторые вещи. Чего я хочу добиться, так это прочитать большой файл (в данном примере CSV) по частям и сохранить каждый фрагмент как файл Parquet.
У меня есть следующий код:

Код: Выделить всё

import requests

import pyarrow as pa

def create_record_batches(lines, schema):
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
for line in lines:
if line:
newline = [{col_name: val for col_name, val in zip(schema.names, line.decode().split(","))}]
writer.write(pa.RecordBatch.from_pylist(newline))
else:
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()
# writer.close()
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)

writer.close()
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()

with requests.get(
url="https://data.cityofnewyork.us/api/views/kxp8-n2sj/rows.csv?accessType=DOWNLOAD",
stream=True
) as response:
response.raise_for_status()

lines = response.iter_lines(chunk_size=128*1024*1024, delimiter=b"\n")
schema_str = next(lines).decode()
schema = pa.schema([ (col_name, pa.string()) for col_name in schema_str.split(",") ])

buffer = create_record_batches(lines, schema)

counter = 0
for buf in buffer:
if counter == 3:
break
# print(buf)

print("*")
print("**********")
print("**********")

counter += 1

Я еще не на стадии написания чего-либо в виде файла Parquet, но позвольте мне разобрать мой код, а также подробно описать то, что мне непонятно.
  • Я использую Request.get(url,stream=True) для получения ответа и использую response.iter_lines(chunk_size=128*1024*1024) для иметь возможность манипулировать фрагментами размером 128 МБ.
  • Поскольку первая строка файла является заголовком, я использую следующее для создания схемы Pyarrow:

    Код: Выделить всё

    schema_str = next(lines).decode()
    schema = pa.schema([ (col_name, pa.string()) for col_name in schema_str.split(",") ])
    
  • Для остальных строк я вызываю функцию create_record_batches(lines, Schema). Эта функция отвечает за итерацию каждой строки каждого фрагмента для создания RecordBatch и записи его в объект RecordBatchStreamWriter.

    В конце фрагмента представлено с помощью пустой строки, например b'', мы вводим вариант else, и я получаю байты из своего приемника, поэтому он должен возвращать весь RecordBatch определенного фрагмента.
  • Тем не менее, в другом случае я создаю совершенно новый приемник и записывающее устройство, мое намерение здесь — очистить и приемник, и записывающее устройство. Я пытался использовать раковину.truncate() и раковину.seek(0), но мне не удалось это сделать, поскольку раковина.getvalue() закрывает приемник.
  • Наконец, я закрываю средство записи и возвращаю оставшееся содержимое в приемник, но не знаю, имеет ли это смысл, учитывая, что я использую iter_lines()..?

Что я не понимаю в своем текущем коде, так это то, что когда я проверяю потребление памяти ( используя htop) моей программы во время отладки и имея точку останова на каждом объекте buf, я ожидаю, что объем памяти будет колебаться около 128 МБ, поскольку это размер моих фрагментов, но память увеличивается, поэтому я делаю что-то не так, но я не знаю что. Я чувствую, что это происходит потому, что приемник и записывающее устройство на самом деле не «сбрасываются».

Что касается записи в файлы Parquet, я имею в виду следующее: чтобы использовать каждый объект buf для создания PyArrow.Table, а затем записать каждую таблицу как файл Parquet, для создания PyArrow.Table я думаю, что буду использовать следующее:

Код: Выделить всё

import pyarrow.parquet as pq

...

for i_chunk, buf in enumerate(buffer):
table = pa.ipc.open_stream(pa.py_buffer(buf)).read_all()
file_path = f"some_file_name_chunk{i_chunk}.parquet"
pq.write_table(table, file_path)
Иду ли я в правильном направлении с моим текущим кодом? а также, что я планирую делать, чтобы записывать свои фрагменты в виде файлов Parquet?
Большое спасибо, если вы поможете мне научиться лучше управлять библиотекой PyArrow!
PS: Я решил не использовать какие-либо библиотеки, такие как pandas или csv, моя цель — попытаться манипулировать только объектами буфера и потока, если это возможно.

Подробнее здесь: https://stackoverflow.com/questions/791 ... uet-file-u
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Чтение файлов Parquet с использованием parquet.net занимает больше времени, чем Pyarrow (Python)
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • Запишите файл Parquet с собственным образом GraalVM.
    Anonymous » » в форуме JAVA
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Потоковая потоковая передача Polars: Parquet Parquet на основе Shift (-1)
    Anonymous » » в форуме Python
    0 Ответы
    6 Просмотры
    Последнее сообщение Anonymous
  • ОШИБКА: Не удалось построить колесо для pyarrow (Не удалось построить pyarrow)
    Anonymous » » в форуме Python
    0 Ответы
    52 Просмотры
    Последнее сообщение Anonymous
  • Возникает ошибка при установке pyarrow: Не удалось собрать pyarrow.
    Anonymous » » в форуме Python
    0 Ответы
    24 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»