Проблемы с памятью при сортировке большего размера, чем файл памяти с полярамиPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Проблемы с памятью при сортировке большего размера, чем файл памяти с полярами

Сообщение Anonymous »

Все.
Я борюсь с проблемами с памятью при сортировке LazyFrame, превышающего объем памяти, с помощью поляров с помощью команды Skin_csv.
Я Я разрабатываю механизм, который сможет объединять и сортировать несколько больших наборов данных в виде отдельных файлов CSV.
Я работаю с двумя файлами по 6 ГБ для объединения и сортировки. . Я начал с модуля с 8 ГБ ОЗУ, но приложение аварийно завершает работу даже с 32 ГБ.
Примечание. Эти файлы не являются «обычными» CSV-файлами в том смысле, что у них есть собственный «заголовок». » и «трейлер» записывает его состав. Каждая строка идентифицируется «кодом записи» (первые 2 цифры). Различные типы записей могут иметь разное количество полей, поэтому я не могу просто прочитать весь файл с помощью простого символа «scan_csv» + разделитель.
Вот пример одного из этих файлов:< /p>

Код: Выделить всё

00;XPTO;99999991000197;20240905;130444;000001;20240905;130444;000001;20240901;0000
01;99900000001;F;1;0;000000321
01;00000000001;F;2;0;000000123
01;77700000003;F;0;0;000000999
01;22200000004;F;0;0;000000999
01;12300000004;F;0;0;000000999
99;00000000005;
Что делает мой код:
  • Создает LazyFrame со всеми частями, которые нужно объединить.
  • Создать столбец с исходной строкой «как есть», поскольку мне не нужно каким-либо образом менять его форматирование или содержимое в выходном файле.
  • Создать новый столбец, разделяющий исходную строку в списке с помощью предоставленного разделителя (;).
  • Отфильтровывать нежелательные коды записей из кадра данных (путем фильтрации значения первого поля списка)
    Создайте новые столбцы для «сортируемых» полей, прочитав индекс в столбце «список».
  • Сортируйте фрейм данных в новый файл с помощью pl. мойка_csv

Код: Выделить всё

import polars as pl
import os

# Polars tmp dir should be set to /data since /tmp does not have enough space
os.environ["POLARS_TEMP_DIR"] = "/data/testes/merger/tmp/"
pl.Config.set_verbose(True)

valid_record_codes = ["01", "02", "05", "06"]
sort_column_indexes = [1, 0]

ORIGINAL_ROW_COLUMN = "original_row"
ROW_AS_LIST_COLUMN = "row_as_list"
RECORD_CODE_COLUMN = "record_code"
SEPARATOR = ";"

# Read the input files
lf = pl.scan_csv(
"./part_*",
separator=chr(0000),
has_header=False,
new_columns=[ORIGINAL_ROW_COLUMN],
)

# Add a column on the dataframe for the record_code.
# As for now, the record code is always the first field of the dataframe
lf = lf.select(ORIGINAL_ROW_COLUMN).with_columns(
pl.col(ORIGINAL_ROW_COLUMN).str.split(SEPARATOR).alias(ROW_AS_LIST_COLUMN)
)

# Eliminate undesired records from the dataframe
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN).list.get(0).alias(RECORD_CODE_COLUMN)
)

lf = lf.filter(pl.col(RECORD_CODE_COLUMN).is_in(valid_record_codes)).select(
pl.col(ORIGINAL_ROW_COLUMN), pl.col(ROW_AS_LIST_COLUMN)
)

sort_columns = list()

# Add the sortable columns to the LazyFrame
for sort_column in sort_column_indexes:
column_name = f"column_{sort_column}"
lf = lf.with_columns(
pl.col(ROW_AS_LIST_COLUMN)
.list.get(sort_column)
.alias(column_name)
)
sort_columns.append(column_name)

# Sort the dataframe
lf = lf.sort(sort_columns).select(ORIGINAL_ROW_COLUMN)

# Write the file
lf.sink_csv("output.csv", include_header=False)
Код отлично работает с небольшими файлами. Но с файлами большего размера поляры начинают потреблять МНОГО памяти, пока процесс Python не завершится.
Вот результат выполнения кода:

Код: Выделить всё

>>> lf.sink_csv("output.csv", include_header=False)
RUN STREAMING PIPELINE
[csv -> hstack -> hstack -> filter -> fast_projection -> hstack -> hstack -> sort_multiple -> fast_projection -> parquet_sink]
STREAMING CHUNK SIZE: 600000 rows
OOC sort started
Temporary directory path in use: /data/testes/merger/tmp/
STREAMING CHUNK SIZE: 600000 rows
finished sinking into OOC sort in 539.231142825s
full file dump of OOC sort took 539.631121035s
spill size: 0 mb
processing 1375 files
Killed

Я начал следить за использованием памяти и диска в этом модуле и вот что я заметил:
  • После сообщение «STREAMING CHUNK SIZE: 600000 строк», использование памяти начинает расти, пока не достигнет пика в ~20 ГБ.
  • После этого начинается создание временных файлов. На этом этапе использование памяти резко снижается (
Я уже пробовал уменьшить размер чанка в конфигурации поляров (с 600к до 50к). Это дает мне больше времени на выполнение, но ошибка все равно возникает.
Я также пытался преобразовать сортируемые поля в INT64, но в этом случае программа завершает работу еще до создания всех частичных файлов для sort.
Есть ли еще какие-либо параметры конфигурации, с которыми я мог бы поиграться, чтобы оптимизировать использование памяти?

Подробнее здесь: https://stackoverflow.com/questions/790 ... ith-polars
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»