Код: Выделить всё
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# `x` is referenced `n_loop` times.
def kernel(x, n_loop):
return sum((np.sin(np.cos(np.sin(np.cos(x + i)))).sum()
for i in range(n_loop)))
def worker(group, n_loop):
return sum(kernel(g, n_loop) for g in group)
if __name__ == '__main__':
vsize = 1000
n = 100000
np.random.seed(42)
X = [np.random.uniform(size = vsize) for _ in range(n)]
n_core = os.cpu_count()
dx = len(X) // n_core + 1
Xgroups = []
for i in range(0, len(X), dx):
Xgroups.append(X[i:min(i+dx, len(X))])
import time, mtmp
n_loop = 1
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}\n")
n_loop = 10
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
Код: Выделить всё
python3.14t mtmp.py
Код: Выделить всё
Inner loops 1, multithreading time: 0.68 sec, result sum: 66993532.26748964
Inner loops 1, multiprocessing time: 0.95 sec, result sum: 66993532.26748964
Inner loops 10, multithreading time: 6.51 sec, result sum: 717434683.1879174
Inner loops 10, multiprocessing time: 2.20 sec, result sum: 717434683.1879174
Возможно ли или будет ли когда-нибудь возможно (возможно, в будущей версии со свободным потоком?) сказать Python: «Эй, я никогда не буду изменять этот объект в ядре, пожалуйста, пометьте его как неизменяемый или защищенный и игнорируйте любые попытки синхронизации, или просто выдавайте исключения, если вы обнаружите попытку мутации, например Series.to_numpy() от Polars?
Многопроцессорная обработка хороша до тех пор, пока данные, необходимые многим работникам, не станут слишком сложными и большими, чтобы их можно было скопировать в пространство памяти каждого процесса.
Подробнее здесь: https://stackoverflow.com/questions/798 ... ded-python
Мобильная версия