Многопоточность становится намного медленнее, чем многопоточность в Python со свободным потоком.Python

Программы на Python
Ответить
Anonymous
 Многопоточность становится намного медленнее, чем многопоточность в Python со свободным потоком.

Сообщение Anonymous »

Рассмотрим следующий исполняемый скрипт Python mtmp.py:

Код: Выделить всё

import numpy as np
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# `x` is referenced `n_loop` times.
def kernel(x, n_loop):
return sum((np.sin(np.cos(np.sin(np.cos(x + i)))).sum()
for i in range(n_loop)))

def worker(group, n_loop):
return sum(kernel(g, n_loop) for g in group)

if __name__ == '__main__':
vsize = 1000
n = 100000
np.random.seed(42)

X = [np.random.uniform(size = vsize) for _ in range(n)]
n_core = os.cpu_count()
dx = len(X) // n_core + 1
Xgroups = []
for i in range(0, len(X), dx):
Xgroups.append(X[i:min(i+dx, len(X))])

import time, mtmp
n_loop = 1
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop},  multithreading  time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")

st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop},  multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}\n")

n_loop = 10
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading  time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")

st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
Запуск скрипта через (mac, 14 ядер)

Код: Выделить всё

python3.14t mtmp.py
выдал следующее:

Код: Выделить всё

Inner loops 1,  multithreading  time: 0.68 sec, result sum: 66993532.26748964
Inner loops 1,  multiprocessing time: 0.95 sec, result sum: 66993532.26748964

Inner loops 10, multithreading  time: 6.51 sec, result sum: 717434683.1879174
Inner loops 10, multiprocessing time: 2.20 sec, result sum: 717434683.1879174
Я пришел с C++ и был так взволнован Python 3.14t, пока не почувствовал трудности с подсчетом ссылок. Очевидно, если функция ядра хоть немного сложна, например, обращается к некоторым объектам Python несколько раз или обращается к нескольким объектам, многопоточность может стать существенно медленнее, чем многопроцессорность. Это имеет смысл, поскольку правильный refCount потребует атомарности или блокировки каждый раз, когда объект ссылается, что приводит к остановке приведенного выше многопоточного примера.
Возможно ли или будет ли когда-нибудь возможно (возможно, в будущей версии со свободным потоком?) сказать Python: «Эй, я никогда не буду изменять этот объект в ядре, пожалуйста, пометьте его как неизменяемый или защищенный и игнорируйте любые попытки синхронизации, или просто выдавайте исключения, если вы обнаружите попытку мутации, например Series.to_numpy()?
Многопроцессорная обработка хороша до тех пор, пока данные, необходимые многим работникам, не станут слишком сложными и большими, чтобы их можно было скопировать в пространство памяти каждого процесса.
Обновление: нашел ноутбук с Windows (8-ядерный процессор Intel) и вот тесты:

Код: Выделить всё

python3.14t mtmp.py

Inner loops 1,  multithreading  time: 1.26 sec, result sum: 66993532.26748968
Inner loops 1,  multiprocessing time: 4.51 sec, result sum: 66993532.26748968

Inner loops 10, multithreading  time: 8.02 sec, result sum: 717434683.1879174
Inner loops 10, multiprocessing time: 5.97 sec, result sum: 717434683.1879174
«Проблема» все еще актуальна.
Обновление II: пользователь @ngoldbaum в комментариях упомянул, что причина может быть связана с numpy, а не с конкуренцией потоков из-за подсчета ссылок. Таким образом, я заменил массив numpy собственным списком Python в функции ядра. Ниже приведен полный сценарий тестирования mtmp_list.py:

Код: Выделить всё

# python3.14t mtmp_list.py

import numpy as np
import os, math
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

# `x` is referenced `n_loop` times.
def kernel(x, n_loop):
return sum(sum(math.sin(math.cos(math.sin(math.cos(xx + i))))
for i, xx in enumerate(x)) for _ in range(n_loop))

def worker(group, n_loop):
return sum(kernel(g, n_loop) for g in group)

if __name__ == '__main__':
vsize = 100
n = 100000
np.random.seed(42)

X = [list(np.random.uniform(size = vsize)) for _ in range(n)]
n_core = os.cpu_count()
dx = len(X) // n_core + 1
Xgroups = []
for i in range(0, len(X), dx):
Xgroups.append(X[i:min(i+dx, len(X))])

import time, mtmp_list
n_loop = 1
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop},  multithreading  time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")

st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop},  multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}\n")

n_loop = 20
st = time.time()
with ThreadPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multithreading  time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")

st = time.time()
with ProcessPoolExecutor(max_workers = n_core) as pool:
futures = [pool.submit(mtmp_list.worker, g, n_loop) for g in Xgroups]
workerRsts = [future.result() for future in futures]
print(f"Inner loops {n_loop}, multiprocessing time: { \
time.time() - st:.2f} sec, result sum: {sum(workerRsts)}")
Результаты:

Код: Выделить всё

Inner loops 1,  multithreading  time: 0.22 sec, result sum: 7228790.8732207455
Inner loops 1,  multiprocessing time: 8.10 sec, result sum: 7228790.8732207455

Inner loops 20, multithreading  time: 4.61 sec, result sum: 144575817.46441492
Inner loops 20, multiprocessing time: 10.66 sec, result sum: 144575817.46441492

Похоже, @ngoldbaum прав. Синхронизация потоков по счетчику ссылок все еще может быть одной из причин, но что-то другое (скорее всего, numpy), очевидно, играет здесь доминирующую роль. Я тестировал другие конфигурации (, vsize, n_loop), а многопоточность последовательно и часто намного быстрее, чем многопроцессорность. Затраты времени на многопоточность также почти идеально линейно масштабируются с увеличением n_loop, что указывает на то, что любые другие накладные расходы незначительны.

Подробнее здесь: https://stackoverflow.com/questions/798 ... ded-python
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»