Проблема многопроцессорности массива с общей памятьюPython

Программы на Python
Ответить
Anonymous
 Проблема многопроцессорности массива с общей памятью

Сообщение Anonymous »

Проблема

У меня много данных, которые я сначала загружаю в оперативную память с помощью SharedMemory, а затем читаю со многими дочерними процессами с помощью multiprocessing.Pool.map< /code>.

Код

Это упрощенная версия (не совсем пример жалобы), которую я использую:
def SharedObject: # wrapper of SharedMemory passed to subprocesses
def __init__(self, blablabla):
self.shmem = SharedMemory(create=True, size=numbers) # reference to shmem will live as long as the instance of the class will (shmem not getting GC)
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data... # this array will get destroyed after __init__ finishes

def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
return selected.__getitem__(indices)

# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__

Странное поведение

У меня на компьютере 64 ГБ оперативной памяти. Если я запускаю описанный выше алгоритм с небольшим количеством данных, все работает гладко, но если я загружаю много данных (~ 40 ГБ), я получаю следующую ошибку:
n __enter__
return self._semlock.__enter__()
^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\mnandcharvim\AppData\Local\Programs\Python\Python312\Lib\multiprocessing\connection.py", line 321, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(

(по сути, неправильное использование блокировки)
Данные доступны только для чтения, поэтому для меня было бы лучше, если бы я мог загрузить их в файл, доступный только для чтения. часть памяти, чтобы не было блокировок. Этот вопрос указывает на то, что SharedMemory не имеет блокировки, но на данный момент, судя по полученной ошибке, я не уверен).

2-я версия

Я также попытался привести код в соответствие с примером официальной документации, на которую я дал ссылку:
shmems = [] # module variable (same module of SharedObject) as suggested in some answers
def SharedObject:
def __init__(self, blablabla):
shmem = SharedMemory(name=self.name, create=True, size=numbers) # this reference destroyed after __init__
shmems.append(shmem) # but this one outlives it
temp_arr = np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
temp_arr[:] = ...lot of data...

def __getitem__(self, indices) -> np.ndarray: # used by subprocesses
shmem = SharedMemory(name=self.name) # added line
selected = np.ndarray(self.shape, dtype=self.dtype, buffer=shmem.buf)
return selected.__getitem__(indices)

# This is in main process
shobj = SharedObject()
with multiprocessing.Pool() as pool:
result= list(pool.map(f, shobj)) # f does shobj.__getitem__

Второе странное поведение

Эта вторая версия выдает мне ошибку: shmem = SharedMemory(name=self.name) # добавлена ​​строка, говорящая что мне не хватает системных ресурсов для выполнения mmap (странно, ведь данные уже были отображены в оперативной памяти: ресурсов хватило на якобы первую и единоразовую загрузку).

О чем следует помнить

  • Обработка данных предполагает только их чтение
  • I не могу использовать потоки (поскольку при чтении контента я выполняю вычисления, не освобождающие GIL).
  • Я просто хочу знать четкий, простой и функциональный способ, для которого я должен был бы использовать SharedMemory в сочетание с массивами numpy и многопроцессорной обработкой.
  • Мне нужно читать данные как массивы numpy
Минимальный воспроизводимый пример< /h4>
Чтобы правильно воспроизвести аргументы, настройте их в зависимости от ресурсов вашей системы, чтобы не переполнять оперативную память. Пример заходит в тупик на одной из основных итераций цикла for. Нажатие CTRL+C отображает ошибку.
Я использую 64-разрядную версию Python 3.12.2 в Windows 10.
script.py:
from multiprocessing.shared_memory import SharedMemory
import numpy as np
import multiprocessing

class SharedObject:
def __init__(self):
self.shape = (2000000, 2049)
self.shmem = SharedMemory(create=True, size=2000000*8196)
self.dtype = np.float32
_temp_arr=np.ndarray(self.shape, dtype=self.dtype, buffer=self.shmem.buf)
_temp_arr[:] = [np.zeros(shape=2049, dtype=self.dtype) for j in range(2000000)]

def f(data: SharedObject) -> int:
print("hello")
return 5

if __name__ == "__main__":
shobj = SharedObject()

for j in range(20):
print(j)
with multiprocessing.Pool() as pool:
list(pool.map(f, [shobj]*256)) # 256 calls to f


Подробнее здесь: https://stackoverflow.com/questions/790 ... ng-probelm
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»