Я вычисляю данные, используя возможности многопроцессорной обработки. «Рабочий» скрипт помещает свои результаты в очередь, которую я затем легко читаю в основной программе с помощью:
result=[]
while not Q.empty():
result.append(Q.get())
Но если у меня в очереди много данных, я подумал, что с таким же успехом можно использовать одно или два ядра моего процессора, чтобы начать чтение очереди, пока рабочие процессы продолжают работать. Я нашел здесь фрагмент кода: Как использовать очередь многопроцессорной обработки в Python? это создает процессы «считывания», непрерывно читающие очередь, пока рабочий не сообщит им, что вычисление окончено. Это работает нормально, но программа чтения только читает очередь и ничего не возвращает. Как мне на самом деле получить данные из очереди, чтобы использовать их в моей основной программе? Единственное решение, которое я нашел, — создать список с помощью multiprocess.Manager() и передать его в качестве аргумента читателю. Это работает, но занимает много времени!! Это полностью убивает время выполнения моей программы, поэтому первый метод (чтение очереди непосредственно в основной программе) намного лучше. Есть ли у меня другое решение? Действительно ли возможно получить в основной программе данные из очереди, которые я читаю в отдельных процессах?
Ниже пример кода, построенный из различных кусков кода, собранных тут и там:
import multiprocessing as mp
from datetime import datetime
def worker(numbers, start, end, qu):
"""A worker function to calculate squares of numbers."""
res=[]
for i in range(start, end):
res.append(numbers * numbers)
qu.put(res)
def reader(q, outputlist):
"""Read from the queue; this spawns as a separate Process"""
#returnedlist=[]
while True:
msg = q.get() # Read from the queue and do nothing
if msg == "DONE":
break
[outputlist.append(x) for x in msg] # comment out if using 1st method (reading queue in the main program)
return
def start_reader_procs(q, num_of_reader_procs, L):
"""Start the reader processes and return all in a list to the caller
source:
https://stackoverflow.com/questions/115 ... -in-python"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = mp.Process(target=reader, args=((q),L,))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc
all_reader_procs.append(reader_p)
return all_reader_procs
def main(core_count):
numbers = range(50000) # A larger range for a more evident effect of multiprocessing
segment = len(numbers) // core_count
processes = []
#Q = mp.Queue()
m = mp.Manager()
Q = m.Queue()
# comment out if using 1st method (reading queue in the main program)
#----------------------------------
num_of_reader_procs=2
L = m.list()
all_reader_procs = start_reader_procs(Q, num_of_reader_procs, L)
#-----------------------------------------
for i in range(core_count):
start = i * segment
if i == core_count - 1:
end = len(numbers) # Ensure the last segment goes up to the end
else:
end = start + segment
# Creating a process for each segment
p = mp.Process(target=worker, args=(numbers, start, end, Q))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All worker processes terminated")
# comment out if using 1st method (reading queue in the main program)
#----------------------------------
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
Q.put("DONE")
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
result=list(L)
#----------------------------------
# result=[]
# while not Q.empty():
# result.append(Q.get())
# result = [x for L in result for x in L] # flatten the list of lists
return result
if __name__ == '__main__':
for core_count in [1, 2, 4]:
starttime = datetime.now()
print(f"Using {core_count} core(s):")
result = main(core_count)
print(f"First 10 squares: {list(result)[:10]}") # Display the first 10 results as a sample
endtime = datetime.now()
print ("Total computation time : {:.1f} sec".format((endtime-starttime).total_seconds()))
print()
Подробнее здесь: https://stackoverflow.com/questions/791 ... -processes
Возвращать данные в основную программу из очереди, которую я читаю в отдельных процессах ⇐ Python
Программы на Python
1731512066
Anonymous
Я вычисляю данные, используя возможности многопроцессорной обработки. «Рабочий» скрипт помещает свои результаты в очередь, которую я затем легко читаю в основной программе с помощью:
result=[]
while not Q.empty():
result.append(Q.get())
Но если у меня в очереди много данных, я подумал, что с таким же успехом можно использовать одно или два ядра моего процессора, чтобы начать чтение очереди, пока рабочие процессы продолжают работать. Я нашел здесь фрагмент кода: Как использовать очередь многопроцессорной обработки в Python? это создает процессы «считывания», непрерывно читающие очередь, пока рабочий не сообщит им, что вычисление окончено. Это работает нормально, но программа чтения только читает очередь и ничего не возвращает. Как мне на самом деле получить данные из очереди, чтобы использовать их в моей основной программе? Единственное решение, которое я нашел, — создать список с помощью multiprocess.Manager() и передать его в качестве аргумента читателю. Это работает, но занимает много времени!! Это полностью убивает время выполнения моей программы, поэтому первый метод (чтение очереди непосредственно в основной программе) намного лучше. Есть ли у меня другое решение? Действительно ли возможно получить в основной программе данные из очереди, которые я читаю в отдельных процессах?
Ниже пример кода, построенный из различных кусков кода, собранных тут и там:
import multiprocessing as mp
from datetime import datetime
def worker(numbers, start, end, qu):
"""A worker function to calculate squares of numbers."""
res=[]
for i in range(start, end):
res.append(numbers[i] * numbers[i])
qu.put(res)
def reader(q, outputlist):
"""Read from the queue; this spawns as a separate Process"""
#returnedlist=[]
while True:
msg = q.get() # Read from the queue and do nothing
if msg == "DONE":
break
[outputlist.append(x) for x in msg] # comment out if using 1st method (reading queue in the main program)
return
def start_reader_procs(q, num_of_reader_procs, L):
"""Start the reader processes and return all in a list to the caller
source:
https://stackoverflow.com/questions/11515944/how-to-use-multiprocessing-queue-in-python"""
all_reader_procs = list()
for ii in range(0, num_of_reader_procs):
### reader_p() reads from qq as a separate process...
### you can spawn as many reader_p() as you like
### however, there is usually a point of diminishing returns
reader_p = mp.Process(target=reader, args=((q),L,))
reader_p.daemon = True
reader_p.start() # Launch reader_p() as another proc
all_reader_procs.append(reader_p)
return all_reader_procs
def main(core_count):
numbers = range(50000) # A larger range for a more evident effect of multiprocessing
segment = len(numbers) // core_count
processes = []
#Q = mp.Queue()
m = mp.Manager()
Q = m.Queue()
# comment out if using 1st method (reading queue in the main program)
#----------------------------------
num_of_reader_procs=2
L = m.list()
all_reader_procs = start_reader_procs(Q, num_of_reader_procs, L)
#-----------------------------------------
for i in range(core_count):
start = i * segment
if i == core_count - 1:
end = len(numbers) # Ensure the last segment goes up to the end
else:
end = start + segment
# Creating a process for each segment
p = mp.Process(target=worker, args=(numbers, start, end, Q))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All worker processes terminated")
# comment out if using 1st method (reading queue in the main program)
#----------------------------------
### Tell all readers to stop...
for ii in range(0, num_of_reader_procs):
Q.put("DONE")
for idx, a_reader_proc in enumerate(all_reader_procs):
print(" Waiting for reader_p.join() index %s" % idx)
a_reader_proc.join() # Wait for a_reader_proc() to finish
print(" reader_p() idx:%s is done" % idx)
result=list(L)
#----------------------------------
# result=[]
# while not Q.empty():
# result.append(Q.get())
# result = [x for L in result for x in L] # flatten the list of lists
return result
if __name__ == '__main__':
for core_count in [1, 2, 4]:
starttime = datetime.now()
print(f"Using {core_count} core(s):")
result = main(core_count)
print(f"First 10 squares: {list(result)[:10]}") # Display the first 10 results as a sample
endtime = datetime.now()
print ("Total computation time : {:.1f} sec".format((endtime-starttime).total_seconds()))
print()
Подробнее здесь: [url]https://stackoverflow.com/questions/79185151/return-data-to-the-main-program-from-a-queue-that-i-read-in-separate-processes[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия