Мне нужно декодировать несколько больших строк base64 размером от нескольких сотен МБ до ~5 ГБ каждая.
Очевидное решение — один вызов до base64.b64decode ("эталонный вариант").
Я пытаюсь ускорить процесс с помощью многопроцессорности, но, что удивительно, он намного медленнее эталонного реализация.
На моей машине я получаю:
reference_implementation
decoding time = 7.37
implmementation1
Verify result Ok
decoding time = 7.59
threaded_impl
Verify result Ok
decoding time = 13.24
mutiproc_impl
Verify result Ok
decoding time = 11.82
Что я делаю неправильно?
(Внимание: код требует много памяти!)
import base64
from time import perf_counter
from binascii import a2b_base64
import concurrent.futures as fut
from time import sleep
from gc import collect
from multiprocessing import cpu_count
def reference_implementation(encoded):
"""This is the implementation that gives the desired result"""
return base64.b64decode(encoded)
def implmementation1(encoded):
"""Try to call the directly the underlying library"""
return a2b_base64(encoded)
def threaded_impl(encoded, N):
"""Try multi threading calling the underlying library"""
# split the string into pieces
d = len(encoded) // N # number of splits
lbatch = (d // 4) * 4 # lenght of first N-1 batches, the last is len(source) - lbatch*N
batches = []
for i in range(N-1):
start = i * lbatch
end = (i + 1) * lbatch
# print(i, start, end)
batches.append(encoded[start:end])
batches.append(encoded[end:])
# Decode
ret = bytes()
with fut.ThreadPoolExecutor(max_workers=N) as executor:
# Submit tasks for execution and put pieces together
for result in executor.map(a2b_base64, batches):
ret = ret + result
return ret
def mutiproc_impl(encoded, N):
"""Try multi processing calling the underlying library"""
# split the string into pieces
d = len(encoded) // N # number of splits
lbatch = (d // 4) * 4 # lenght of first N-1 batches, the last is len(source) - lbatch*N
batches = []
for i in range(N-1):
start = i * lbatch
end = (i + 1) * lbatch
# print(i, start, end)
batches.append(encoded[start:end])
batches.append(encoded[end:])
# Decode
ret = bytes()
with fut.ProcessPoolExecutor(max_workers=N) as executor:
# Submit tasks for execution and put pieces together
for result in executor.map(a2b_base64, batches):
ret = ret + result
return ret
if __name__ == "__main__":
CPU_NUM = cpu_count()
# Prepare a 4.6 GB byte string (with less than 32 GB ram you may experience swapping on virtual memory)
repeat = 60000000
large_b64_string = b'VGhpcyBzdHJpbmcgaXMgZm9ybWF0dGVkIHRvIGJlIGVuY29kZWQgd2l0aG91dCBwYWRkaW5nIGJ5dGVz' * repeat
# Compare implementations
print("\nreference_implementation")
t_start = perf_counter()
dec1 = reference_implementation(large_b64_string)
t_end = perf_counter()
print('decoding time =', (t_end - t_start))
sleep(1)
print("\nimplmementation1")
t_start = perf_counter()
dec2 = implmementation1(large_b64_string)
t_end = perf_counter()
print("Verify result", "Ok" if dec2==dec1 else "FAIL")
print('decoding time =', (t_end - t_start))
del dec2; collect() # force freeing memory to avoid swapping on virtual mem
sleep(1)
print("\nthreaded_impl")
t_start = perf_counter()
dec3 = threaded_impl(large_b64_string, CPU_NUM)
t_end = perf_counter()
print("Verify result", "Ok" if dec3==dec1 else "FAIL")
print('decoding time =', (t_end - t_start))
del dec3; collect()
sleep(1)
print("\nmutiproc_impl")
t_start = perf_counter()
dec4 = mutiproc_impl(large_b64_string, CPU_NUM)
t_end = perf_counter()
print("Verify result", "Ok" if dec4==dec1 else "FAIL")
print('decoding time =', (t_end - t_start))
del dec4; collect()
Подробнее здесь: https://stackoverflow.com/questions/790 ... threaded-p
Почему декодирование большой строки base64 происходит быстрее в однопоточных процессах, чем в многопоточных? ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение