У меня есть следующий код:
Код: Выделить всё
import requests
import pyarrow as pa
def create_record_batches(lines, schema):
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
for line in lines:
if line:
newline = [{col_name: val for col_name, val in zip(schema.names, line.decode().split(","))}]
writer.write(pa.RecordBatch.from_pylist(newline))
else:
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()
# writer.close()
sink = pa.BufferOutputStream()
writer = pa.ipc.RecordBatchStreamWriter(sink, schema)
writer.close()
buffer_value = sink.getvalue()
yield buffer_value.to_pybytes()
with requests.get(
url="https://data.cityofnewyork.us/api/views/kxp8-n2sj/rows.csv?accessType=DOWNLOAD",
stream=True
) as response:
response.raise_for_status()
lines = response.iter_lines(chunk_size=128*1024*1024, delimiter=b"\n")
schema_str = next(lines).decode()
schema = pa.schema([ (col_name, pa.string()) for col_name in schema_str.split(",") ])
buffer = create_record_batches(lines, schema)
counter = 0
for buf in buffer:
if counter == 3:
break
# print(buf)
print("*")
print("**********")
print("**********")
counter += 1
- Я использую Request.get(url,stream=True) для получения ответа и использую response.iter_lines(chunk_size=128*1024*1024) для иметь возможность манипулировать фрагментами размером 128 МБ.
- Поскольку первая строка файла является заголовком, я использую следующее для создания схемы Pyarrow:
Код: Выделить всё
schema_str = next(lines).decode() schema = pa.schema([ (col_name, pa.string()) for col_name in schema_str.split(",") ])
- Для остальных строк я вызываю функцию create_record_batches(lines, Schema). Эта функция отвечает за итерацию каждой строки каждого фрагмента для создания RecordBatch и записи его в объект RecordBatchStreamWriter.
В конце фрагмента представлено с помощью пустой строки, например b'', мы вводим вариант else, и я получаю байты из своего приемника, поэтому он должен возвращать весь RecordBatch определенного фрагмента. - Тем не менее, в другом случае я создаю совершенно новый приемник и записывающее устройство, мое намерение здесь — очистить и приемник, и записывающее устройство. Я пытался использовать раковину.truncate() и раковину.seek(0), но мне не удалось это сделать, поскольку раковина.getvalue() закрывает приемник.
- Наконец, я закрываю средство записи и возвращаю оставшееся содержимое в приемник, но не знаю, имеет ли это смысл, учитывая, что я использую iter_lines()..?
Что я не понимаю в своем текущем коде, так это то, что когда я проверяю потребление памяти ( используя htop) моей программы во время отладки и имея точку останова на каждом объекте buf, я ожидаю, что объем памяти будет колебаться около 128 МБ, поскольку это размер моих фрагментов, но память увеличивается, поэтому я делаю что-то не так, но я не знаю что. Я чувствую, что это происходит потому, что приемник и записывающее устройство на самом деле не «сбрасываются».
Что касается записи в файлы Parquet, я имею в виду следующее: чтобы использовать каждый объект buf для создания PyArrow.Table, а затем записать каждую таблицу как файл Parquet, для создания PyArrow.Table я думаю, что буду использовать следующее:
Код: Выделить всё
import pyarrow.parquet as pq
...
for i_chunk, buf in enumerate(buffer):
table = pa.ipc.open_stream(pa.py_buffer(buf)).read_all()
file_path = f"some_file_name_chunk{i_chunk}.parquet"
pq.write_table(table, file_path)
Большое спасибо, если вы поможете мне научиться лучше управлять библиотекой PyArrow!
PS: Я решил не использовать какие-либо библиотеки, такие как pandas или csv, моя цель — попытаться манипулировать только объектами буфера и потока, если это возможно.
Подробнее здесь: https://stackoverflow.com/questions/791 ... uet-file-u