У меня есть 6 процессов, каждый из них будет потребителем и 1 процесс производителем. производитель прочитает путь к файлу в многопроцессную очередь. и у потребителя будет два потока: один для чтения файла изprocess_queue и помещения данных в thread_queue, а другой поток будет выполнять вычисления и помещать результат в result_queue(который представляет собой очередь процесса). Проблема в том, что я обнаружил, что каждый рабочий процесс доходит до конца, но программа застревает в worker.join() в основном потоке, я не знаю почему, я в замешательстве.
Ниже мой код
Cores = 8
def main():
start_time = time.perf_counter()
process_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()
gather_process = multiprocessing.Process(target=get_file_path, args=(process_q,))
gather_process.start()
worker_process = []
for i in range(Cores - 2):
worker = multiprocessing.Process(target=mul_heatmap_worker, args=(process_q,result_q))
worker.start()
worker_process.append(worker)
print(f"process start{worker}")
print("all process started")
gather_process.join()
print("gather is done")
for i in range(Cores - 2):
print("before sending None to process")
process_q.put(None)
print("sending None to process")
#process_q.close()
#process_q.join_thread()
#result_q
for i in range(Cores - 2):
print(i)
worker = worker_process
worker.join()
print("one process is done")
print("all process is done")
mask_shape = (1024 + 1, 2048 + 1)
total_mask = np.zeros(mask_shape)
while not result_q.empty():
total_mask += result_q.get().get_HeatMape_plt()
#HM.finalize()
#one_heatmap()
end_time = time.perf_counter()
duration = end_time - start_time
#print(f"the Total bbox are {total_pds_bbox}")
print(f"the duration of the execution is {duration}")
#print(f"time = {execution_time}")
#to_Array(df,["person", "rider"])
def get_file_path(iner_process_q):
total_files = 0
for dirpath, dirnames, filenames in os.walk(Cityscapes_Fine_GT_Path):
global total_json_read
for filename in filenames:
if filename.endswith('json'):
total_files += 1
file_path = os.path.join(dirpath, filename)
iner_process_q.put(file_path)
print("gather is done in method")
'''Two thread are using for Cityscapes dataset evaluation'''
def mul_heatmap_worker(iner_process_q, iner_result_q):
#current 46 sec with I/O thread
thread_q = Queue()
io_thread = threading.Thread(target=read_json_files_to_df, args=(iner_process_q,thread_q))
print("io start")
io_thread.start()
print("processing")
process_thread = threading.Thread(target=sta_Cityscapes, args=(iner_result_q,thread_q))
process_thread.start()
#converter_thread = threading.Thread(target=HM.convert_to_gpu)
#converter_thread.start()
io_thread.join()
print("io is done")
thread_q.put(None)
process_thread.join()
print("processing is done")
#HM.show_HeatMape_plt()
#print(total_json_read)
'''Thread for reading the json file to a DataFrame and puting it to json queue, save I/O time'''
def read_json_files_to_df(iner_process_q, iner_thread_q):
while True:
#get from main queue
file_path = iner_process_q.get()
if file_path is None:
break
df = pd.read_json(file_path)
iner_thread_q.put(df)
#total_json_read += 1
def sta_Cityscapes(iner_result_q,iner_thread_q):
HM_for_worker = HeatMaper(width=2048, height=1024)
while True:
df = iner_thread_q.get()
#global total_pds_bbox
#The last one in the queue will be None
if df is None:
print("read the None")
break
pds = findPedstrain(df, ["person", "rider"])
if pds.size > 0:
pdlocations = pds['objects'].apply(lambda x : x['polygon'])
for alist in pdlocations:
points = np.array(alist)
HM_for_worker.add_to_mask(points)
#total_pds_bbox += 1
iner_result_q.put(HM_for_worker)
print("puting HM")
Подробнее здесь: https://stackoverflow.com/questions/791 ... never-stop
Моя программа на Python с многопроцессорностью и многопоточностью никогда не остановится ⇐ Python
Программы на Python
1730806965
Anonymous
У меня есть 6 процессов, каждый из них будет потребителем и 1 процесс производителем. производитель прочитает путь к файлу в многопроцессную очередь. и у потребителя будет два потока: один для чтения файла изprocess_queue и помещения данных в thread_queue, а другой поток будет выполнять вычисления и помещать результат в result_queue(который представляет собой очередь процесса). Проблема в том, что я обнаружил, что каждый рабочий процесс доходит до конца, но программа застревает в worker.join() в основном потоке, я не знаю почему, я в замешательстве.
Ниже мой код
Cores = 8
def main():
start_time = time.perf_counter()
process_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()
gather_process = multiprocessing.Process(target=get_file_path, args=(process_q,))
gather_process.start()
worker_process = []
for i in range(Cores - 2):
worker = multiprocessing.Process(target=mul_heatmap_worker, args=(process_q,result_q))
worker.start()
worker_process.append(worker)
print(f"process start{worker}")
print("all process started")
gather_process.join()
print("gather is done")
for i in range(Cores - 2):
print("before sending None to process")
process_q.put(None)
print("sending None to process")
#process_q.close()
#process_q.join_thread()
#result_q
for i in range(Cores - 2):
print(i)
worker = worker_process[i]
worker.join()
print("one process is done")
print("all process is done")
mask_shape = (1024 + 1, 2048 + 1)
total_mask = np.zeros(mask_shape)
while not result_q.empty():
total_mask += result_q.get().get_HeatMape_plt()
#HM.finalize()
#one_heatmap()
end_time = time.perf_counter()
duration = end_time - start_time
#print(f"the Total bbox are {total_pds_bbox}")
print(f"the duration of the execution is {duration}")
#print(f"time = {execution_time}")
#to_Array(df,["person", "rider"])
def get_file_path(iner_process_q):
total_files = 0
for dirpath, dirnames, filenames in os.walk(Cityscapes_Fine_GT_Path):
global total_json_read
for filename in filenames:
if filename.endswith('json'):
total_files += 1
file_path = os.path.join(dirpath, filename)
iner_process_q.put(file_path)
print("gather is done in method")
'''Two thread are using for Cityscapes dataset evaluation'''
def mul_heatmap_worker(iner_process_q, iner_result_q):
#current 46 sec with I/O thread
thread_q = Queue()
io_thread = threading.Thread(target=read_json_files_to_df, args=(iner_process_q,thread_q))
print("io start")
io_thread.start()
print("processing")
process_thread = threading.Thread(target=sta_Cityscapes, args=(iner_result_q,thread_q))
process_thread.start()
#converter_thread = threading.Thread(target=HM.convert_to_gpu)
#converter_thread.start()
io_thread.join()
print("io is done")
thread_q.put(None)
process_thread.join()
print("processing is done")
#HM.show_HeatMape_plt()
#print(total_json_read)
'''Thread for reading the json file to a DataFrame and puting it to json queue, save I/O time'''
def read_json_files_to_df(iner_process_q, iner_thread_q):
while True:
#get from main queue
file_path = iner_process_q.get()
if file_path is None:
break
df = pd.read_json(file_path)
iner_thread_q.put(df)
#total_json_read += 1
def sta_Cityscapes(iner_result_q,iner_thread_q):
HM_for_worker = HeatMaper(width=2048, height=1024)
while True:
df = iner_thread_q.get()
#global total_pds_bbox
#The last one in the queue will be None
if df is None:
print("read the None")
break
pds = findPedstrain(df, ["person", "rider"])
if pds.size > 0:
pdlocations = pds['objects'].apply(lambda x : x['polygon'])
for alist in pdlocations:
points = np.array(alist)
HM_for_worker.add_to_mask(points)
#total_pds_bbox += 1
iner_result_q.put(HM_for_worker)
print("puting HM")
Подробнее здесь: [url]https://stackoverflow.com/questions/79158889/my-python-program-with-multiprocess-and-multithreading-will-never-stop[/url]
Ответить
1 сообщение
• Страница 1 из 1
Перейти
- Кемерово-IT
- ↳ Javascript
- ↳ C#
- ↳ JAVA
- ↳ Elasticsearch aggregation
- ↳ Python
- ↳ Php
- ↳ Android
- ↳ Html
- ↳ Jquery
- ↳ C++
- ↳ IOS
- ↳ CSS
- ↳ Excel
- ↳ Linux
- ↳ Apache
- ↳ MySql
- Детский мир
- Для души
- ↳ Музыкальные инструменты даром
- ↳ Печатная продукция даром
- Внешняя красота и здоровье
- ↳ Одежда и обувь для взрослых даром
- ↳ Товары для здоровья
- ↳ Физкультура и спорт
- Техника - даром!
- ↳ Автомобилистам
- ↳ Компьютерная техника
- ↳ Плиты: газовые и электрические
- ↳ Холодильники
- ↳ Стиральные машины
- ↳ Телевизоры
- ↳ Телефоны, смартфоны, плашеты
- ↳ Швейные машинки
- ↳ Прочая электроника и техника
- ↳ Фототехника
- Ремонт и интерьер
- ↳ Стройматериалы, инструмент
- ↳ Мебель и предметы интерьера даром
- ↳ Cантехника
- Другие темы
- ↳ Разное даром
- ↳ Давай меняться!
- ↳ Отдам\возьму за копеечку
- ↳ Работа и подработка в Кемерове
- ↳ Давай с тобой поговорим...
Мобильная версия