Мне хотелось бы иметь возможность обрабатывать очень большие файлы в Polars без нехватки памяти. В документации предлагают использовать сканирование, ленивые фреймы и стоки, но сложно найти подходящую документацию о том, как это сделать на практике. Надеюсь, некоторые эксперты здесь смогут помочь.
Здесь я привожу пример того, что работает для «меньших» файлов, которые можно обрабатывать в памяти.
1. Настройка
# Imports
import pandas as pd
import polars as pl
import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow._hdfs import HadoopFileSystem
# Setting up HDFS file system
hdfs_filesystem = HDFSConnection('default')
hdfs_out_path_1 = "scanexample.parquet"
hdfs_out_path_2 = "scanexample2.parquet"
2. Создание данных
# Dataset
df = pd.DataFrame({
'A': np.arange(10000),
'B': np.arange(10000),
'C': np.arange(10000),
'D': np.arange(10000),
})
# Writing to Hadoop
pq_table = pa.Table.from_pandas(df)
pq_writer = pq.ParquetWriter(hdfs_out_path_1,
schema=pq_table.schema,
filesystem=hdfs_filesystem)
# Appending to parquet file
pq_writer.write_table(pq_table)
pq_writer.close()
3. Чтение паркета в кадр данных Polars (в памяти)
# Read file
pq_df = pl.read_parquet(source=hdfs_out_path_1,
use_pyarrow=True,
pyarrow_options={"filesystem": hdfs_filesystem})
4. Выполнение преобразований и запись в файл
# Transforms and write
pq_df.filter(pl.col('A')>9000)\
.write_parquet(file = hdfs_out_path_2, use_pyarrow=True, pyarrow_options={"filesystem": hdfs_filesystem})
5. Теперь делаем то же самое с нехваткой памяти
# Scanning file: Attempt 1
scan_df = pl.scan_parquet(source = hdfs_out_path_2)
ERROR: Cannot find file
# Scanning file: Attempt 2
scan_df = pl.scan_parquet(source = hdfs_filesystem.open_input_stream(hdfs_out_path_1))
ERROR: expected str, bytes or os.PathLike object, not pyarrow.lib.NativeFile
Согласно документации Polars, scan_parquet не принимает аргументы pyarrow. Но там говорится о некоторых «вариантах хранения», которые, я думаю, мне и нужно использовать. Но как?
6. Пример без Hadoop
# Writing to parquet
df.to_parquet(path="testlocal.parquet")
# Read lazily
lazy_df = pl.scan_parquet(source="testlocal.parquet")
# Transforms and write
lazy_df.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
ОБНОВЛЕНИЕ!
Хотя принятый ответ позволяет загружать данные в LazyFrame, этот lazyframe имеет ограниченную функциональность, поскольку он не может записать эти данные в файл, не собрав их сначала в памяти!
# Reading into LazyFrame
import pyarrow.dataset as ds
pq_lf = pl.scan_pyarrow_dataset(
ds.dataset(hdfs_out_path_1,
filesystem= hdfs_filesystem))
# Attempt at sinking to parquet
pq_lf.filter(pl.col('A')>9000).sink_parquet(path= "testlocal.out.parquet")
PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
Подробнее здесь: https://stackoverflow.com/questions/771 ... rom-hadoop