Я вычисляю данные, используя возможности многопроцессорной обработки. «Рабочий» скрипт помещает свои результаты в очередь, которую я затем легко читаю в основной программе с помощью:
result=[]
while not Q.empty():
result.append(Q.get())
Но если у меня в очереди много данных, я подумал, что с таким же успехом можно использовать одно или два ядра моего процессора, чтобы начать чтение очереди, пока рабочие процессы продолжают работать. Я нашел здесь фрагмент кода: Как использовать очередь многопроцессорной обработки в Python? это создает процессы «считывания», непрерывно читающие очередь, пока рабочий не сообщит им, что вычисление окончено. Это работает нормально, но программа чтения только читает очередь и ничего не возвращает. Как мне на самом деле получить данные из очереди, чтобы использовать их в моей основной программе? Единственное решение, которое я нашел, — создать список с помощью multiprocess.Manager() и передать его в качестве аргумента читателю. Это работает, но занимает много времени!! Это полностью убивает время выполнения моей программы, поэтому первый метод (чтение очереди непосредственно в основной программе) намного лучше. Есть ли у меня другое решение? Действительно ли возможно получить в основной программе данные из очереди, которые я читаю в отдельных процессах?
Ниже пример кода, построенный из различных кусков кода, собранных тут и там:
import multiprocessing as mp
from datetime import datetime
def worker(numbers, start, end, qu):
"""A worker function to calculate squares of numbers."""
res=[]
for i in range(start, end):
res.append(numbers * numbers)
qu.put(res)
def reader(q, outputlist):
"""Read from the queue; this spawns as a separate Process"""
#returnedlist=[]
while True:
msg = q.get() # Read from the queue and do nothing
if msg == "DONE":
break
[outputlist.append(x) for x in msg] # comment out if using 1st method (reading queue in the main program)
return
def start_reader_procs(q, num_of_reader_procs, L):
"""Start the reader processes and return all in a list to the caller
source:
https://stackoverflow.com/questions/115 ... -in-python"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = mp.Process(target=reader, args=((q),L,))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc
all_reader_procs.append(reader_p)
return all_reader_procs
def main(core_count):
numbers = range(50000) # A larger range for a more evident effect of multiprocessing
segment = len(numbers) // core_count
processes = []
#Q = mp.Queue()
m = mp.Manager()
Q = m.Queue()
# comment out if using 1st method (reading queue in the main program)
#----------------------------------
num_of_reader_procs=2
L = m.list()
all_reader_procs = start_reader_procs(Q, num_of_reader_procs, L)
#-----------------------------------------
for i in range(core_count):
start = i * segment
if i == core_count - 1:
end = len(numbers) # Ensure the last segment goes up to the end
else:
end = start + segment
# Creating a process for each segment
p = mp.Process(target=worker, args=(numbers, start, end, Q))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All worker processes terminated")
# comment out if using 1st method (reading queue in the main program)
#----------------------------------
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
Q.put("DONE")
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
result=list(L)
#----------------------------------
# result=[]
# while not Q.empty():
# result.append(Q.get())
# result = [x for L in result for x in L] # flatten the list of lists
return result
if __name__ == '__main__':
for core_count in [1, 2, 4]:
starttime = datetime.now()
print(f"Using {core_count} core(s):")
result = main(core_count)
print(f"First 10 squares: {list(result)[:10]}") # Display the first 10 results as a sample
endtime = datetime.now()
print ("Total computation time : {:.1f} sec".format((endtime-starttime).total_seconds()))
print()
Подробнее здесь: https://stackoverflow.com/questions/791 ... -processes
Возвращать данные в основную программу из очереди, которую я читаю в отдельных процессах ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Возвращать данные в основную программу из очереди, которую я читаю в отдельных процессах
Anonymous » » в форуме Python - 0 Ответы
- 11 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Возвращать данные в основную программу из очереди, которую я читаю в отдельных процессах
Anonymous » » в форуме Python - 0 Ответы
- 5 Просмотры
-
Последнее сообщение Anonymous
-