Как я могу принудительно освободить внутренне выделенную память, чтобы избежать накопления выделений, приводящего к MemoPython

Программы на Python
Ответить
Anonymous
 Как я могу принудительно освободить внутренне выделенную память, чтобы избежать накопления выделений, приводящего к Memo

Сообщение Anonymous »

Первый пост о stackoverflow, поэтому заранее извиняюсь, если нарушу правила, приветствуются отзывы. Я мог бы опубликовать сообщение на Staging Ground, мне следовало сделать это раньше, но эта проблема возникла слишком долго, и мне нужно скорое решение.
Я использую RHEL 8.9 со 128 ГБ оперативной памяти. Питон 3.11.5. Это для моей работы, поэтому я не могу копировать и вставлять код, но при необходимости могу воссоздать его для ясности. Я постараюсь описать код как можно лучше.
У меня есть программа на Python для обработки данных. Я ограничиваю. Он загружает файл csv + пару файлов json в фреймы данных. Размер CSV-файла составляет 3–5 ГБ и содержит более 35 000 000 строк, каждая строка представляет собой список из 3–10 целых чисел, он загружается в кадр данных, который мы назовем big_df. Общее количество выделенных ресурсов (VmRSS) после этой загрузки и некоторой базовой предварительной обработки составляет ~7,9 ГБ.
Затем я создаю 3 рабочих процесса с многопроцессорной обработкой.Pool(processes=3, maxtasksperchild=5), затем пул.map([рабочая функция], [аргументы для рабочей функции]). Каждый рабочий цикл обрабатывает более 1000 строк big_df с вложенным циклом (сам по себе 1000-5000 итеров), чтобы выполнить некоторые сравнения и добавить успешные сравнения в список результатов.
Я получаю MemoryError каждый раз после 1200-2500 задач во время травления списка результатов рабочего процесса, когда результат отправляется обратно в основной процесс. Журналирование показывает, что уровни VmRSS составляют ~47,8 ГБ, ~28 ГБ и ~26,5 ГБ для трех соответствующих рабочих процессов на момент возникновения ошибки MemoryError. Их уровни VmRSS постоянно повышаются на протяжении всего срока действия программы. Каждый из них начинается с одинакового VmRSS размером ~7,9 ГБ.
Что я пытался исправить MemoryError:
  • уменьшение аргумента [processes] с 6 до 3 или даже 2, чтобы меньше памяти использовалось для процессов и связанных с ними данных. Это задерживает ошибку памяти, но не предотвращает ее.
  • создание больших фреймов данных () глобальный, поскольку рабочие процессы только читают, а не записывают в них, поэтому это использует преимущества копирования при записи в Linux и позволяет избежать дублирования этих dfs.
  • используя аргумент maxtasksperchild=5, надеясь, что это приведет к сбросу каждого рабочего процесса и освобождению его памяти после каждых 5 запусков.
Нет удачи + на протяжении всего срока службы этих рабочих процессов VmRSS просто сохраняет растет и растет, т. е. нет никаких признаков того, что их память в какой-либо момент выделяется для ОС, она просто накапливается. Это меня не устраивает.
Поэтому я пытаюсь найти способы освободить эту память для ОС. Если другие пути решения лучше, пожалуйста, lmk. На данный момент мы обнаружили, что менеджер внутренней памяти Python обрабатывает все, и что освобождение памяти может быть гарантировано только в том случае, если вы завершите процесс. Я могу попробовать это, но может потребоваться серьезный рефакторинг (я занимаюсь этим уже две недели), я хочу внести минимальные изменения, так как я новый стажер, и до меня этот скрипт работал нормально. Сейчас у нас очень большие данные.
Изменить: некоторый псевдокод
основной процесс:

Код: Выделить всё

global big_df, smaller_df (loaded from csv/json respectively)
chunk_indices = [(start_idx, end_idx) for every 1000 rows of big_df (i.e. [(0,999),(1000,1999),...])
with multiprocessing.Pool(processes=3,maxtasks=5) as p:
result_lists = p.map(worker, chunk_indices)
p.close()
p.join()
create df from result_lists variable, write to output csv file
рабочие процессы(start_idx, end_idx):

Код: Выделить всё

def worker(args):
logging.basicConfig(filename="logfile.log",level=logging.DEBUG)
start_idx, end_idx = args
result = []
for i in range(start_idx, end_idx):
list_i = big_df[i]
for j in range(len(smaller_df)):
if len(list_i & smaller_df[j]):
result.append(smaller_df[j])
logging.debug("time: %s, sizeof results: %s", ctime(), result.__sizeof__())
return result
По данным журнала, этот массив результатов в рабочих процессах может достигать 9 ГБ, а часто составляет около 3–5 ГБ. Это не было бы проблемой, если бы память освобождалась после каждого запуска рабочего процесса.
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»