Проблема
У меня много данных, которые я сначала загружаю в оперативную память с помощью SharedMemory, а затем читаю со многими дочерними процессами с помощью multiprocessing.Pool.map< /code>.Код
Это упрощенная версия (не совсем пример жалобы), которую я использую:def SharedObject: # wrapper of SharedMemory passed to subprocesses
def __init__(self, blablabla):
self.shmem = SharedMemory(create=True, size=numbers) # reference to shmem will live as long as the instance of the class will (shmem not getting GC)
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data... # this array will get destroyed after __init__ finishes
def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
return selected.__getitem__(indices)
# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__
Странное поведение
У меня на компьютере 64 ГБ оперативной памяти. Если я запускаю описанный выше алгоритм с небольшим количеством данных, все работает гладко, но если я загружаю много данных (~ 40 ГБ), я получаю следующую ошибку:n __enter__
return self._semlock.__enter__()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mnandcharvim\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 321, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(
(по сути, неправильное использование блокировки)
Данные доступны только для чтения, поэтому для меня было бы лучше, если бы я мог загрузить их в файл, доступный только для чтения. часть памяти, чтобы не было блокировок. Этот вопрос указывает на то, что SharedMemory не имеет блокировки, но на данный момент, судя по полученной ошибке, я не уверен).
2-я версия
Я также попытался привести код в соответствие с примером официальной документации, на которую я дал ссылку:shmems = [] # module variable (same module of SharedObject) as suggested in some answers
def SharedObject:
def __init__(self, blablabla):
shmem = SharedMemory(name=self.name, create=True, size=numbers) # this reference destroyed after __init__
shmems.append(shmem) # but this one outlives it
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data...
def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
shmem = SharedMemory(name=self.name) # added line
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=shmem.buf)
return selected.__getitem__(indices)
# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__
Второе странное поведение
Эта вторая версия выдает мне ошибку: shmem = SharedMemory(name=self.name) # добавлена строка, говорящая что мне не хватает системных ресурсов для выполнения mmap (странно, ведь данные уже были отображены в оперативной памяти: ресурсов хватило на якобы первую и единоразовую загрузку).О чем следует помнить
- Обработка данных предполагает только их чтение
- I не могу использовать потоки (поскольку при чтении контента я выполняю вычисления, не освобождающие GIL).
- Я просто хочу знать четкий, простой и функциональный способ, для которого я должен был бы использовать SharedMemory в сочетание с массивами numpy и многопроцессорной обработкой.
- Мне нужно читать данные как массивы numpy
Чтобы правильно воспроизвести аргументы, настройте их в зависимости от ресурсов вашей системы, чтобы не переполнять оперативную память. Пример заходит в тупик на одной из основных итераций цикла for. Нажатие CTRL+C отображает ошибку.
Я использую 64-разрядную версию Python 3.12.2 в Windows 10.
script.py:
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import multiprocessing
class SharedObject:
def __init__(self):
self.shape = (2000000, 2049)
self.shmem = SharedMemory(create=True, size=2000000*8196)
self.dtype = np.float32
_temp_arr=np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
_temp_arr[:] = [np.zeros(shape=2049, dtype=self.dtype) for j in range(2000000)]
def f(data: SharedObject) -> int:
print("hello")
return 5
if __name__ == "__main__":
shobj = SharedObject()
for j in range(20):
print(j)
with multiprocessing.Pool() as pool:
list(pool.map(f, [shobj]*256)) # 256 calls to f
Подробнее здесь: https://stackoverflow.com/questions/790 ... ng-probelm
Мобильная версия