У меня есть 6 процессов, каждый из них будет потребителем и 1 процесс производителем. производитель прочитает путь к файлу в многопроцессную очередь. и у потребителя будет два потока: один для чтения файла изprocess_queue и помещения данных в thread_queue, а другой поток будет выполнять вычисления и помещать результат в result_queue(который представляет собой очередь процесса). Проблема в том, что я обнаружил, что каждый рабочий процесс доходит до конца, но программа застревает в worker.join() в основном потоке, я не знаю почему, я в замешательстве.
Ниже мой код
Cores = 8
def main():
start_time = time.perf_counter()
process_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()
gather_process = multiprocessing.Process(target=get_file_path, args=(process_q,))
gather_process.start()
worker_process = []
for i in range(Cores - 2):
worker = multiprocessing.Process(target=mul_heatmap_worker, args=(process_q,result_q))
worker.start()
worker_process.append(worker)
print(f"process start{worker}")
print("all process started")
gather_process.join()
print("gather is done")
for i in range(Cores - 2):
print("before sending None to process")
process_q.put(None)
print("sending None to process")
#process_q.close()
#process_q.join_thread()
#result_q
for i in range(Cores - 2):
print(i)
worker = worker_process
worker.join()
print("one process is done")
print("all process is done")
mask_shape = (1024 + 1, 2048 + 1)
total_mask = np.zeros(mask_shape)
while not result_q.empty():
total_mask += result_q.get().get_HeatMape_plt()
#HM.finalize()
#one_heatmap()
end_time = time.perf_counter()
duration = end_time - start_time
#print(f"the Total bbox are {total_pds_bbox}")
print(f"the duration of the execution is {duration}")
#print(f"time = {execution_time}")
#to_Array(df,["person", "rider"])
def get_file_path(iner_process_q):
total_files = 0
for dirpath, dirnames, filenames in os.walk(Cityscapes_Fine_GT_Path):
global total_json_read
for filename in filenames:
if filename.endswith('json'):
total_files += 1
file_path = os.path.join(dirpath, filename)
iner_process_q.put(file_path)
print("gather is done in method")
'''Two thread are using for Cityscapes dataset evaluation'''
def mul_heatmap_worker(iner_process_q, iner_result_q):
#current 46 sec with I/O thread
thread_q = Queue()
io_thread = threading.Thread(target=read_json_files_to_df, args=(iner_process_q,thread_q))
print("io start")
io_thread.start()
print("processing")
process_thread = threading.Thread(target=sta_Cityscapes, args=(iner_result_q,thread_q))
process_thread.start()
#converter_thread = threading.Thread(target=HM.convert_to_gpu)
#converter_thread.start()
io_thread.join()
print("io is done")
thread_q.put(None)
process_thread.join()
print("processing is done")
#HM.show_HeatMape_plt()
#print(total_json_read)
'''Thread for reading the json file to a DataFrame and puting it to json queue, save I/O time'''
def read_json_files_to_df(iner_process_q, iner_thread_q):
while True:
#get from main queue
file_path = iner_process_q.get()
if file_path is None:
break
df = pd.read_json(file_path)
iner_thread_q.put(df)
#total_json_read += 1
def sta_Cityscapes(iner_result_q,iner_thread_q):
HM_for_worker = HeatMaper(width=2048, height=1024)
while True:
df = iner_thread_q.get()
#global total_pds_bbox
#The last one in the queue will be None
if df is None:
print("read the None")
break
pds = findPedstrain(df, ["person", "rider"])
if pds.size > 0:
pdlocations = pds['objects'].apply(lambda x : x['polygon'])
for alist in pdlocations:
points = np.array(alist)
HM_for_worker.add_to_mask(points)
#total_pds_bbox += 1
iner_result_q.put(HM_for_worker)
print("puting HM")
Подробнее здесь: https://stackoverflow.com/questions/791 ... never-stop
Моя программа на Python с многопроцессорностью и многопоточностью никогда не остановится ⇐ Python
-
- Похожие темы
- Ответы
- Просмотры
- Последнее сообщение
-
-
Избегайте ошибок во время выполнения, вызванных многопроцессорностью в Windows
Anonymous » » в форуме Python - 0 Ответы
- 27 Просмотры
-
Последнее сообщение Anonymous
-
-
-
Cover.py с многопроцессорностью работает только с unittest, но не с pytest?
Anonymous » » в форуме Python - 0 Ответы
- 12 Просмотры
-
Последнее сообщение Anonymous
-