Я использую RHEL 8.9 со 128 ГБ оперативной памяти. Питон 3.11.5. Это для моей работы, поэтому я не могу копировать и вставлять код, но при необходимости могу воссоздать его для ясности. Я постараюсь описать код как можно лучше.
У меня есть программа на Python для обработки данных. Я ограничиваю. Он загружает файл csv + пару файлов json в фреймы данных. Размер CSV-файла составляет 3–5 ГБ и содержит более 35 000 000 строк, каждая строка представляет собой список из 3–10 целых чисел, он загружается в кадр данных, который мы назовем big_df. Общее количество выделенных ресурсов (VmRSS) после этой загрузки и некоторой базовой предварительной обработки составляет ~7,9 ГБ.
Затем я создаю 3 рабочих процесса с многопроцессорной обработкой.Pool(processes=3, maxtasksperchild=5), затем пул.map([рабочая функция], [аргументы для рабочей функции]). Каждый рабочий цикл обрабатывает более 1000 строк big_df с вложенным циклом (сам по себе 1000-5000 итеров), чтобы выполнить некоторые сравнения и добавить успешные сравнения в список результатов.
Я получаю MemoryError каждый раз после 1200-2500 задач во время травления списка результатов рабочего процесса, когда результат отправляется обратно в основной процесс. Журналирование показывает, что уровни VmRSS составляют ~47,8 ГБ, ~28 ГБ и ~26,5 ГБ для трех соответствующих рабочих процессов на момент возникновения ошибки MemoryError. Их уровни VmRSS постоянно повышаются на протяжении всего срока действия программы. Каждый из них начинается с одинакового VmRSS размером ~7,9 ГБ.
Что я пытался исправить MemoryError:
- уменьшение аргумента [processes] с 6 до 3 или даже 2, чтобы меньше памяти использовалось для процессов и связанных с ними данных. Это задерживает ошибку памяти, но не предотвращает ее.
- создание больших фреймов данных () глобальный, поскольку рабочие процессы только читают, а не записывают в них, поэтому это использует преимущества копирования при записи в Linux и позволяет избежать дублирования этих dfs.
Код: Выделить всё
big_df - используя аргумент maxtasksperchild=5, надеясь, что это приведет к сбросу каждого рабочего процесса и освобождению его памяти после каждых 5 запусков.
Поэтому я пытаюсь найти способы освободить эту память для ОС. Если другие пути решения лучше, пожалуйста, lmk. На данный момент мы обнаружили, что менеджер внутренней памяти Python обрабатывает все, и что освобождение памяти может быть гарантировано только в том случае, если вы завершите процесс. Я могу попробовать это, но может потребоваться серьезный рефакторинг (я занимаюсь этим уже две недели), я хочу внести минимальные изменения, так как я новый стажер, и до меня этот скрипт работал нормально. Сейчас у нас очень большие данные.
Изменить: некоторый псевдокод
основной процесс:
Код: Выделить всё
global big_df, smaller_df (loaded from csv/json respectively)
chunk_indices = [(start_idx, end_idx) for every 1000 rows of big_df (i.e. [(0,999),(1000,1999),...])
with multiprocessing.Pool(processes=3,maxtasks=5) as p:
result_lists = p.map(worker, chunk_indices)
p.close()
p.join()
create df from result_lists variable, write to output csv file
Код: Выделить всё
def worker(args):
logging.basicConfig(filename="logfile.log",level=logging.DEBUG)
start_idx, end_idx = args
result = []
for i in range(start_idx, end_idx):
list_i = big_df[i]
for j in range(len(smaller_df)):
if len(list_i & smaller_df[j]):
result.append(smaller_df[j])
logging.debug("time: %s, sizeof results: %s", ctime(), result.__sizeof__())
return result
Мобильная версия