При использовании многопроцессорности в Python увеличение соотношения потребителей к производителям никогда не приводит ⇐ Python
При использовании многопроцессорности в Python увеличение соотношения потребителей к производителям никогда не приводит
Here's what I am trying to do: the producers are generating a value (basically iterative hashing) that is then hash mapped to "bins". The producer places each number into two bins. The consumers are each in charge of a set address range of the bins. The producer places the random number into a work queue for the consumer in charge of the appropriate address range (doing so for both bins the number is mapped to). The consumer takes jobs off its queue and performs a modulo exponential using them then moves on to the next job.
My Problem comes in when running tests to try and determine what kind of ratio of consumers to producers I will need in order to keep the queues from constantly growing in size. I found that a 1:7 ratio of producers to consumers kept the queues from growing. But then a 2:14 did not. In fact there did not seem to be a reasonable point at which the consumers could do enough work to stop the number of items in the queue from growing linearly. I did find that increasing the consumers would slow the growth of the queues but never stop it. Now I do not necessarily expect to be able to double the producers and just double the consumers too but I did expect to be able to find a ratio that was not 2 vs hundreds. To clarify I'm running this code on the cloud and so am able to ensure that there is a cpu to run each process individually.
Here is how I am creating the processes and the functions that they are being given. I'm using the multiprocessing library for creating and handling the processes and queues.
import hashlib import math import sys import time from multiprocessing import Process, JoinableQueue, cpu_count from gmpy2 import mpz, powmod, is_prime, bit_set from random import randint, getrandbits from secrets import token_urlsafe from queue import Empty class Blake2b(object): def __init__(self, output_bit_length=64, salt=None, optimized64=False): self.output_bit_length = output_bit_length digest_size = math.ceil(output_bit_length / 8) # convert to bytes if salt is None: self.hash = hashlib.blake2b(digest_size=digest_size) else: self.hash = hashlib.blake2b(digest_size=digest_size, salt=salt) self.output_modulus = mpz(2**(self.output_bit_length - 1)) # integer double the size of the bit length - 1 (so double -2) def generating_value(self, preimage): h = self.hash.copy() h.update(preimage) current_digest = h.digest() while True: U = int.from_bytes(current_digest, sys.byteorder) % self.output_modulus candidate = bit_set(self.output_modulus + U, 0) if is_prime(candidate): return candidate h.update(b'0') current_digest = h.digest() def KeyGen(self, preimage, size): h = self.hash.copy() h.update(preimage) digest = h.hexdigest() return int(digest, 16) % size def task(bins, task_queue, ident, f): while True: try: next_task = task_queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...', file=f, flush=True) continue if next_task is None: # Poison pill tells it to shutdown print (f'Consumer {ident}: Exiting', flush=True, file=f) task_queue.task_done() break value, in_bin = next_task N, A = bins[in_bin] answer = powmod(A, value, N) bins[in_bin][1] = answer task_queue.task_done() return def add_to_queue(tableSize, num_consumers, task_queues, f): bit_length = 86 #The security parameter b = Blake2b(output_bit_length=bit_length) leftKey = Blake2b(output_bit_length=10, salt=b'left') rightKey = Blake2b(output_bit_length=10, salt=b'right') bins_per_process = tableSize // num_consumers while True: rnd_val = b.generating_value(token_urlsafe(16).encode('utf-8')) KeyLeft = leftKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize) KeyRight = rightKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize) task_queues[KeyLeft//bins_per_process].put([rnd_val, KeyLeft % bins_per_process]) task_queues[KeyRight//bins_per_process].put([rnd_val, KeyRight % bins_per_process]) return if __name__ == '__main__': num_cpus = cpu_count() num_consumers = 16 num_producers = 2 table_size = num_consumers*8 #how many bins total with open("qGrowthTests.txt", "a") as f: table = [[getrandbits(3072), randint(2, 2**3072 - 1)] for _ in range(table_size)] bins_lst = [table[(i * len(table)) // num_consumers:((i + 1) * len(table)) // num_consumers] for i in range(num_consumers)] task_queues = [JoinableQueue() for _ in range(num_consumers)] producers = [] for i in range(num_producers): producer = Process(target=add_to_queue, args=(table_size, num_consumers, task_queues, f)) producers.append(producer) producer.start() consumers = [] for queue_num, queue in enumerate(task_queues, start=0): process = Process(target=task, args=(bins_lst[queue_num], queue, queue_num, f)) consumers.append(process) process.start() # checking the size of the queues once every 5 seconds for a 5 minute test n = 1 while n < 60: # 5 minutes = 300 seconds, 300/5=60 time.sleep(5) for q_num, q in enumerate(task_queues, start=0): print("Queue ", q_num, " has: ", q.qsize(), file=f) n += 1 # Force all the processes to finish for process in producers: if process.is_alive(): print (f'Producer {process}: Exiting', flush=True, file=f) process.terminate() process.join() for process in consumers: if process.is_alive(): print (f'Consumer {process}: Exiting', flush=True, file=f) process.terminate() process.join() print('Test Complete') When I tested the time it takes to do a set amount of work (giving 2 producers a number of jobs, time them, then have the consumers work through the queues), I found that that the consumers half the amount of time they take to do the same work when I double their number (expected behaviour). For instance 4 consumers take half the time as 2 consumers, 8 consumers half the time of 4 and so on. Extrapolating this I get that it should take a ratio of 2:16 for the consumers to be working through the tasks as fast as the producers are creating them. However when I run the code for five minutes having the producers do as many jobs as they can in that time with 16 or more consumers I still see linear growth of the queues. For example when running with 16 consumers the final size of all of the queues range from 450000-470000,items so all fairly close in size.
I've looked at ensuring that the producers are giving even amounts of work to all of the consumers. Also that the items are even actually being removed from the queues. I've tried using concurrent futures process pools and a manager to handle the queues and while all of those processes will complete at the same time its seems to me that there is overhead forcing that outcome because those take longer to perform the same number of tasks and when I check cpu load most are at 30% load while the code is running. Whereas with the way I create the processes as above they are at 100%.
I've tried to figure out what about my code might be wrong but I can't see what is so different about 2 producers that they are creating more work than even 42 consumers can keep up with. I think there might be something fundamental about how multiprocessing works that I am missing but I can't see what. Thoughts?
Источник: https://stackoverflow.com/questions/781 ... -to-produc
Here's what I am trying to do: the producers are generating a value (basically iterative hashing) that is then hash mapped to "bins". The producer places each number into two bins. The consumers are each in charge of a set address range of the bins. The producer places the random number into a work queue for the consumer in charge of the appropriate address range (doing so for both bins the number is mapped to). The consumer takes jobs off its queue and performs a modulo exponential using them then moves on to the next job.
My Problem comes in when running tests to try and determine what kind of ratio of consumers to producers I will need in order to keep the queues from constantly growing in size. I found that a 1:7 ratio of producers to consumers kept the queues from growing. But then a 2:14 did not. In fact there did not seem to be a reasonable point at which the consumers could do enough work to stop the number of items in the queue from growing linearly. I did find that increasing the consumers would slow the growth of the queues but never stop it. Now I do not necessarily expect to be able to double the producers and just double the consumers too but I did expect to be able to find a ratio that was not 2 vs hundreds. To clarify I'm running this code on the cloud and so am able to ensure that there is a cpu to run each process individually.
Here is how I am creating the processes and the functions that they are being given. I'm using the multiprocessing library for creating and handling the processes and queues.
import hashlib import math import sys import time from multiprocessing import Process, JoinableQueue, cpu_count from gmpy2 import mpz, powmod, is_prime, bit_set from random import randint, getrandbits from secrets import token_urlsafe from queue import Empty class Blake2b(object): def __init__(self, output_bit_length=64, salt=None, optimized64=False): self.output_bit_length = output_bit_length digest_size = math.ceil(output_bit_length / 8) # convert to bytes if salt is None: self.hash = hashlib.blake2b(digest_size=digest_size) else: self.hash = hashlib.blake2b(digest_size=digest_size, salt=salt) self.output_modulus = mpz(2**(self.output_bit_length - 1)) # integer double the size of the bit length - 1 (so double -2) def generating_value(self, preimage): h = self.hash.copy() h.update(preimage) current_digest = h.digest() while True: U = int.from_bytes(current_digest, sys.byteorder) % self.output_modulus candidate = bit_set(self.output_modulus + U, 0) if is_prime(candidate): return candidate h.update(b'0') current_digest = h.digest() def KeyGen(self, preimage, size): h = self.hash.copy() h.update(preimage) digest = h.hexdigest() return int(digest, 16) % size def task(bins, task_queue, ident, f): while True: try: next_task = task_queue.get(timeout=0.5) except Empty: print('Consumer: gave up waiting...', file=f, flush=True) continue if next_task is None: # Poison pill tells it to shutdown print (f'Consumer {ident}: Exiting', flush=True, file=f) task_queue.task_done() break value, in_bin = next_task N, A = bins[in_bin] answer = powmod(A, value, N) bins[in_bin][1] = answer task_queue.task_done() return def add_to_queue(tableSize, num_consumers, task_queues, f): bit_length = 86 #The security parameter b = Blake2b(output_bit_length=bit_length) leftKey = Blake2b(output_bit_length=10, salt=b'left') rightKey = Blake2b(output_bit_length=10, salt=b'right') bins_per_process = tableSize // num_consumers while True: rnd_val = b.generating_value(token_urlsafe(16).encode('utf-8')) KeyLeft = leftKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize) KeyRight = rightKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize) task_queues[KeyLeft//bins_per_process].put([rnd_val, KeyLeft % bins_per_process]) task_queues[KeyRight//bins_per_process].put([rnd_val, KeyRight % bins_per_process]) return if __name__ == '__main__': num_cpus = cpu_count() num_consumers = 16 num_producers = 2 table_size = num_consumers*8 #how many bins total with open("qGrowthTests.txt", "a") as f: table = [[getrandbits(3072), randint(2, 2**3072 - 1)] for _ in range(table_size)] bins_lst = [table[(i * len(table)) // num_consumers:((i + 1) * len(table)) // num_consumers] for i in range(num_consumers)] task_queues = [JoinableQueue() for _ in range(num_consumers)] producers = [] for i in range(num_producers): producer = Process(target=add_to_queue, args=(table_size, num_consumers, task_queues, f)) producers.append(producer) producer.start() consumers = [] for queue_num, queue in enumerate(task_queues, start=0): process = Process(target=task, args=(bins_lst[queue_num], queue, queue_num, f)) consumers.append(process) process.start() # checking the size of the queues once every 5 seconds for a 5 minute test n = 1 while n < 60: # 5 minutes = 300 seconds, 300/5=60 time.sleep(5) for q_num, q in enumerate(task_queues, start=0): print("Queue ", q_num, " has: ", q.qsize(), file=f) n += 1 # Force all the processes to finish for process in producers: if process.is_alive(): print (f'Producer {process}: Exiting', flush=True, file=f) process.terminate() process.join() for process in consumers: if process.is_alive(): print (f'Consumer {process}: Exiting', flush=True, file=f) process.terminate() process.join() print('Test Complete') When I tested the time it takes to do a set amount of work (giving 2 producers a number of jobs, time them, then have the consumers work through the queues), I found that that the consumers half the amount of time they take to do the same work when I double their number (expected behaviour). For instance 4 consumers take half the time as 2 consumers, 8 consumers half the time of 4 and so on. Extrapolating this I get that it should take a ratio of 2:16 for the consumers to be working through the tasks as fast as the producers are creating them. However when I run the code for five minutes having the producers do as many jobs as they can in that time with 16 or more consumers I still see linear growth of the queues. For example when running with 16 consumers the final size of all of the queues range from 450000-470000,items so all fairly close in size.
I've looked at ensuring that the producers are giving even amounts of work to all of the consumers. Also that the items are even actually being removed from the queues. I've tried using concurrent futures process pools and a manager to handle the queues and while all of those processes will complete at the same time its seems to me that there is overhead forcing that outcome because those take longer to perform the same number of tasks and when I check cpu load most are at 30% load while the code is running. Whereas with the way I create the processes as above they are at 100%.
I've tried to figure out what about my code might be wrong but I can't see what is so different about 2 producers that they are creating more work than even 42 consumers can keep up with. I think there might be something fundamental about how multiprocessing works that I am missing but I can't see what. Thoughts?
Источник: https://stackoverflow.com/questions/781 ... -to-produc
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Как мне войти в систему при использовании многопроцессорности в Python?
Anonymous » » в форуме Python - 0 Ответы
- 17 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Почему await length.get() блокирует и приводит к зависанию моих асинхронных потребителей?
Anonymous » » в форуме Python - 0 Ответы
- 9 Просмотры
-
Последнее сообщение Anonymous
-