Я борюсь с проблемами с памятью при сортировке LazyFrame, превышающего объем памяти, с помощью поляров с помощью команды Skin_csv.
Я Я разрабатываю механизм, который сможет объединять и сортировать несколько больших наборов данных в виде отдельных файлов CSV.
Я работаю с двумя файлами по 6 ГБ для объединения и сортировки. . Я начал с модуля с 8 ГБ ОЗУ, но приложение аварийно завершает работу даже с 32 ГБ.
Примечание. Эти файлы не являются «обычными» CSV-файлами в том смысле, что у них есть собственный «заголовок». » и «трейлер» записывает его состав. Каждая строка идентифицируется «кодом записи» (первые 2 цифры). Различные типы записей могут иметь разное количество полей, поэтому я не могу просто прочитать весь файл с помощью простого символа «scan_csv» + разделитель.
Вот пример одного из этих файлов:< /p>
Код: Выделить всё
00;XPTO;99999991000197;20240905;130444;000001;20240905;130444;000001;20240901;0000
01;99900000001;F;1;0;000000321
01;00000000001;F;2;0;000000123
01;77700000003;F;0;0;000000999
01;22200000004;F;0;0;000000999
01;12300000004;F;0;0;000000999
99;00000000005;
- Создает LazyFrame со всеми частями, которые нужно объединить.
- Создать столбец с исходной строкой «как есть», поскольку мне не нужно каким-либо образом менять его форматирование или содержимое в выходном файле.
- Создать новый столбец, разделяющий исходную строку в списке с помощью предоставленного разделителя (;).
- Отфильтровывать нежелательные коды записей из кадра данных (путем фильтрации значения первого поля списка)
Создайте новые столбцы для «сортируемых» полей, прочитав индекс в столбце «список». - Сортируйте фрейм данных в новый файл с помощью pl. мойка_csv
Код: Выделить всё
import polars as pl
import os
# Polars tmp dir should be set to /data since /tmp does not have enough space
os.environ["POLARS_TEMP_DIR"] = "/data/testes/merger/tmp/"
pl.Config.set_verbose(True)
valid_record_codes = ["01", "02", "05", "06"]
sort_column_indexes = [1, 0]
ORIGINAL_ROW_COLUMN = "original_row"
ROW_AS_LIST_COLUMN = "row_as_list"
RECORD_CODE_COLUMN = "record_code"
SEPARATOR = ";"
# Read the input files
lf = pl.scan_csv(
"./part_*",
separator=chr(0000),
has_header=False,
new_columns=[ORIGINAL_ROW_COLUMN],
)
# Add a column on the dataframe for the record_code.
# As for now, the record code is always the first field of the dataframe
lf = lf.select(ORIGINAL_ROW_COLUMN).with_columns(
pl.col(ORIGINAL_ROW_COLUMN).str.split(SEPARATOR).alias(ROW_AS_LIST_COLUMN)
)
# Eliminate undesired records from the dataframe
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN).list.get(0).alias(RECORD_CODE_COLUMN)
)
lf = lf.filter(pl.col(RECORD_CODE_COLUMN).is_in(valid_record_codes)).select(
pl.col(ORIGINAL_ROW_COLUMN), pl.col(ROW_AS_LIST_COLUMN)
)
sort_columns = list()
# Add the sortable columns to the LazyFrame
for sort_column in sort_column_indexes:
column_name = f"column_{sort_column}"
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN)
.list.get(sort_column)
.alias(column_name)
)
sort_columns.append(column_name)
# Sort the dataframe
lf = lf.sort(sort_columns).select(ORIGINAL_ROW_COLUMN)
# Write the file
lf.sink_csv("output.csv", include_header=False)
Вот результат выполнения кода:
Код: Выделить всё
>>> lf.sink_csv("output.csv", include_header=False)
RUN STREAMING PIPELINE
[csv -> hstack -> hstack -> filter -> fast_projection -> hstack -> hstack -> sort_multiple -> fast_projection -> parquet_sink]
STREAMING CHUNK SIZE: 600000 rows
OOC sort started
Temporary directory path in use: /data/testes/merger/tmp/
STREAMING CHUNK SIZE: 600000 rows
finished sinking into OOC sort in 539.231142825s
full file dump of OOC sort took 539.631121035s
spill size: 0 mb
processing 1375 files
Killed
- После сообщение «STREAMING CHUNK SIZE: 600000 строк», использование памяти начинает расти, пока не достигнет пика в ~20 ГБ.
- После этого начинается создание временных файлов. На этом этапе использование памяти резко снижается (
Я также пытался преобразовать сортируемые поля в INT64, но в этом случае программа завершает работу еще до создания всех частичных файлов для sort.
Есть ли еще какие-либо параметры конфигурации, с которыми я мог бы поиграться, чтобы оптимизировать использование памяти?
Подробнее здесь: https://stackoverflow.com/questions/790 ... ith-polars