Возвращать данные в основную программу из очереди, которую я читаю в отдельных процессахPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Возвращать данные в основную программу из очереди, которую я читаю в отдельных процессах

Сообщение Anonymous »

Я вычисляю данные, используя возможности многопроцессорной обработки. «Рабочий» скрипт помещает свои результаты в очередь, которую я затем легко читаю в основной программе с помощью:
result=[]
while not Q.empty():
result.append(Q.get())

Но если у меня в очереди много данных, я подумал, что с таким же успехом можно использовать одно или два ядра моего процессора, чтобы начать чтение очереди, пока рабочие процессы продолжают работать. Я нашел здесь фрагмент кода: Как использовать очередь многопроцессорной обработки в Python? это создает процессы «считывания», непрерывно читающие очередь, пока рабочий не сообщит им, что вычисление окончено. Это работает нормально, но программа чтения только читает очередь и ничего не возвращает. Как мне на самом деле получить данные из очереди, чтобы использовать их в моей основной программе? Единственное решение, которое я нашел, — создать список с помощью multiprocess.Manager() и передать его в качестве аргумента читателю. Это работает, но занимает много времени!! Это полностью убивает время выполнения моей программы, поэтому первый метод (чтение очереди непосредственно в основной программе) намного лучше. Есть ли у меня другое решение? Действительно ли возможно получить в основной программе данные из очереди, которые я читаю в отдельных процессах?
Ниже пример кода, построенный из различных кусков кода, собранных тут и там:
import multiprocessing as mp
from datetime import datetime

def worker(numbers, start, end, qu):
"""A worker function to calculate squares of numbers."""
res=[]
for i in range(start, end):
res.append(numbers * numbers)
qu.put(res)

def reader(q, outputlist):
"""Read from the queue; this spawns as a separate Process"""
#returnedlist=[]
while True:
msg = q.get() # Read from the queue and do nothing
if msg == "DONE":
break
[outputlist.append(x) for x in msg] # comment out if using 1st method (reading queue in the main program)
return

def start_reader_procs(q, num_of_reader_procs, L):
"""Start the reader processes and return all in a list to the caller
source:
https://stackoverflow.com/questions/115 ... -in-python"""

all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = mp.Process(target=reader, args=((q),L,))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc

all_reader_procs.append(reader_p)

return all_reader_procs

def main(core_count):
numbers = range(50000) # A larger range for a more evident effect of multiprocessing
segment = len(numbers) // core_count
processes = []
#Q = mp.Queue()
m = mp.Manager()
Q = m.Queue()

# comment out if using 1st method (reading queue in the main program)
#----------------------------------
num_of_reader_procs=2
L = m.list()
all_reader_procs = start_reader_procs(Q, num_of_reader_procs, L)
#-----------------------------------------

for i in range(core_count):
start = i * segment
if i == core_count - 1:
end = len(numbers) # Ensure the last segment goes up to the end
else:
end = start + segment
# Creating a process for each segment
p = mp.Process(target=worker, args=(numbers, start, end, Q))
processes.append(p)
p.start()

for p in processes:
p.join()

print("All worker processes terminated")

# comment out if using 1st method (reading queue in the main program)
#----------------------------------
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
Q.put("DONE")
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
result=list(L)
#----------------------------------

# result=[]
# while not Q.empty():
# result.append(Q.get())
# result = [x for L in result for x in L] # flatten the list of lists

return result

if __name__ == '__main__':

for core_count in [1, 2, 4]:

starttime = datetime.now()
print(f"Using {core_count} core(s):")
result = main(core_count)
print(f"First 10 squares: {list(result)[:10]}") # Display the first 10 results as a sample
endtime = datetime.now()
print ("Total computation time : {:.1f} sec".format((endtime-starttime).total_seconds()))
print()


Подробнее здесь: https://stackoverflow.com/questions/791 ... -processes
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»