Код: Выделить всё
from multiprocessing import Pool
from time import perf_counter as now
import numpy as np
def make_func():
n = 20000
np.random.seed(6)
start = now()
M = np.random.rand(n, n)
return lambda x, y: M[x, x] + M[y, y]
class ParallelProcessor:
def __init__(self):
pass
def process_task(self, args):
"""Unpack arguments internally"""
index, integer_arg = args
print(f(index, integer_arg))
def run_parallel(self, tasks, num_cores=None):
"""Simplified parallel execution without partial"""
num_cores = num_cores
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
global f
f = make_func()
print(f"************** {now() - start} seconds to make f")
start = now()
with Pool(num_cores) as pool:
results = pool.map( self.process_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
В 3.14 Multiprocessing Linux будет перемещена в Spawn . Итак, в ожидании я сделал версию, используя Spawn .
from multiprocessing import Pool, RawArray, set_start_method
from time import perf_counter as now
import numpy as np
def init_worker(shared_array_base, shape):
"""Initializer function to set up shared memory for each worker"""
global M
M = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)
def worker_task(args):
"""Worker function that reconstructs f using shared memory"""
index, integer_arg = args
result = M[index, index] + M[integer_arg, integer_arg]
print(result)
return result
class ParallelProcessor:
def __init__(self):
pass
def run_parallel(self, tasks, num_cores=None):
"""Run tasks in parallel using spawn and shared memory"""
set_start_method('spawn', force=True) # Ensure 'spawn' is used
n = 20000
shape = (n, n)
# Use 'd' for double-precision float (float64) instead of np.float64
shared_array_base = RawArray('d', n * n)
M_local = np.frombuffer(shared_array_base, dtype=np.float64).reshape(shape)
# Initialize the array in the main process
np.random.seed(7)
start = now()
M_local[:] = np.random.rand(n, n)
print(f"************** {now() - start} seconds to make M")
# Prepare arguments for worker tasks
task_args = [(idx, val) for idx, val in enumerate(tasks)]
start = now()
with Pool(num_cores, initializer=init_worker, initargs=(shared_array_base, shape)) as pool:
results = pool.map(worker_task, task_args)
print(f"************** {now() - start} seconds to run all jobs")
return results
if __name__ == "__main__":
processor = ParallelProcessor()
processor.run_parallel(tasks=[1, 2, 3, 4, 5, 6], num_cores=6)
< /code>
К сожалению, это не только медленнее, но и использует гораздо больше памяти. Может быть, он делает копию массива Numpy?>
Подробнее здесь: https://stackoverflow.com/questions/794 ... processing