Код: Выделить всё
import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# `x` is referenced `n_loop` times.
def kernel(x, n_loop):
return sum((np.sin(np.cos(np.sin(np.cos(x + i)))).sum()
for i in range(n_loop)))
def worker(group, n_loop):
return sum(kernel(g, n_loop) for g in group)
if __name__ == '__main__':
vsize = 1000
n = 100000
np.random.seed(42)
X = [np.random.uniform(size = vsize) for _ in range(n)]
n_core = os.cpu_count()
dx = len(X) // n_core + 1
Xgroups = []
for i in range(0, len(X), dx):
Xgroups.append(X[i:min(i+dx, len(X))])
import time, mtmp
n_loop = 1
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}\n")
n_loop = 10
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
Код: Выделить всё
python3.14t mtmp.py
Код: Выделить всё
Inner loops 1, multithreading time: 0.68 sec, result sum: 66993532.26748964
Inner loops 1, multiprocessing time: 0.95 sec, result sum: 66993532.26748964
Inner loops 10, multithreading time: 6.51 sec, result sum: 717434683.1879174
Inner loops 10, multiprocessing time: 2.20 sec, result sum: 717434683.1879174
Возможно ли или будет ли когда-нибудь возможно (возможно, в будущей версии со свободным потоком?) сказать Python: «Эй, я никогда не буду изменять этот объект в ядре, пожалуйста, пометьте его как неизменяемый или защищенный и игнорируйте любые попытки синхронизации, или просто выдавайте исключения, если вы обнаружите попытку мутации, например Series.to_numpy()?
Многопроцессорная обработка хороша до тех пор, пока данные, необходимые многим работникам, не станут слишком сложными и большими, чтобы их можно было скопировать в пространство памяти каждого процесса.
Обновление: нашел ноутбук с Windows (8-ядерный процессор Intel) и вот тесты:
Код: Выделить всё
python3.14t mtmp.py
Inner loops 1, multithreading time: 1.26 sec, result sum: 66993532.26748968
Inner loops 1, multiprocessing time: 4.51 sec, result sum: 66993532.26748968
Inner loops 10, multithreading time: 8.02 sec, result sum: 717434683.1879174
Inner loops 10, multiprocessing time: 5.97 sec, result sum: 717434683.1879174
Обновление II: пользователь @ngoldbaum в комментариях упомянул, что причина может быть связана с numpy, а не с конкуренцией потоков из-за подсчета ссылок. Таким образом, я заменил массив numpy собственным списком Python в функции ядра. Ниже приведен полный сценарий тестирования mtmp_list.py:
Код: Выделить всё
# python3.14t mtmp_list.py
import numpy as np
import os, math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# `x` is referenced `n_loop` times.
def kernel(x, n_loop):
return sum(sum(math.sin(math.cos(math.sin(math.cos(xx + i))))
for i, xx in enumerate(x)) for _ in range(n_loop))
def worker(group, n_loop):
return sum(kernel(g, n_loop) for g in group)
if __name__ == '__main__':
vsize = 100
n = 100000
np.random.seed(42)
X = [list(np.random.uniform(size = vsize)) for _ in range(n)]
n_core = os.cpu_count()
dx = len(X) // n_core + 1
Xgroups = []
for i in range(0, len(X), dx):
Xgroups.append(X[i:min(i+dx, len(X))])
import time, mtmp_list
n_loop = 1
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}\n")
n_loop = 20
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
Код: Выделить всё
Inner loops 1, multithreading time: 0.22 sec, result sum: 7228790.8732207455
Inner loops 1, multiprocessing time: 8.10 sec, result sum: 7228790.8732207455
Inner loops 20, multithreading time: 4.61 sec, result sum: 144575817.46441492
Inner loops 20, multiprocessing time: 10.66 sec, result sum: 144575817.46441492
Код: Выделить всё
nПодробнее здесь: https://stackoverflow.com/questions/798 ... ded-python
Мобильная версия