Моя программа на Python с многопроцессорностью и многопоточностью никогда не остановитсяPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Моя программа на Python с многопроцессорностью и многопоточностью никогда не остановится

Сообщение Anonymous »

У меня есть 6 процессов, каждый из них будет потребителем и 1 процесс производителем. производитель прочитает путь к файлу в многопроцессную очередь. и у потребителя будет два потока: один для чтения файла изprocess_queue и помещения данных в thread_queue, а другой поток будет выполнять вычисления и помещать результат в result_queue(который представляет собой очередь процесса). Проблема в том, что я обнаружил, что каждый рабочий процесс доходит до конца, но программа застревает в worker.join() в основном потоке, я не знаю почему, я в замешательстве.
Ниже мой код
Cores = 8
def main():
start_time = time.perf_counter()

process_q = multiprocessing.Queue()
result_q = multiprocessing.Queue()

gather_process = multiprocessing.Process(target=get_file_path, args=(process_q,))
gather_process.start()

worker_process = []
for i in range(Cores - 2):
worker = multiprocessing.Process(target=mul_heatmap_worker, args=(process_q,result_q))
worker.start()
worker_process.append(worker)

print(f"process start{worker}")

print("all process started")
gather_process.join()
print("gather is done")

for i in range(Cores - 2):
print("before sending None to process")
process_q.put(None)
print("sending None to process")

#process_q.close()
#process_q.join_thread()
#result_q
for i in range(Cores - 2):
print(i)
worker = worker_process
worker.join()
print("one process is done")

print("all process is done")
mask_shape = (1024 + 1, 2048 + 1)
total_mask = np.zeros(mask_shape)
while not result_q.empty():
total_mask += result_q.get().get_HeatMape_plt()
#HM.finalize()
#one_heatmap()
end_time = time.perf_counter()
duration = end_time - start_time

#print(f"the Total bbox are {total_pds_bbox}")
print(f"the duration of the execution is {duration}")
#print(f"time = {execution_time}")
#to_Array(df,["person", "rider"])

def get_file_path(iner_process_q):
total_files = 0
for dirpath, dirnames, filenames in os.walk(Cityscapes_Fine_GT_Path):
global total_json_read
for filename in filenames:
if filename.endswith('json'):
total_files += 1
file_path = os.path.join(dirpath, filename)

iner_process_q.put(file_path)

print("gather is done in method")

'''Two thread are using for Cityscapes dataset evaluation'''
def mul_heatmap_worker(iner_process_q, iner_result_q):
#current 46 sec with I/O thread

thread_q = Queue()

io_thread = threading.Thread(target=read_json_files_to_df, args=(iner_process_q,thread_q))
print("io start")
io_thread.start()

print("processing")
process_thread = threading.Thread(target=sta_Cityscapes, args=(iner_result_q,thread_q))
process_thread.start()

#converter_thread = threading.Thread(target=HM.convert_to_gpu)
#converter_thread.start()

io_thread.join()
print("io is done")
thread_q.put(None)
process_thread.join()
print("processing is done")

#HM.show_HeatMape_plt()
#print(total_json_read)

'''Thread for reading the json file to a DataFrame and puting it to json queue, save I/O time'''
def read_json_files_to_df(iner_process_q, iner_thread_q):
while True:

#get from main queue

file_path = iner_process_q.get()

if file_path is None:
break

df = pd.read_json(file_path)
iner_thread_q.put(df)
#total_json_read += 1

def sta_Cityscapes(iner_result_q,iner_thread_q):

HM_for_worker = HeatMaper(width=2048, height=1024)
while True:
df = iner_thread_q.get()
#global total_pds_bbox
#The last one in the queue will be None
if df is None:
print("read the None")
break
pds = findPedstrain(df, ["person", "rider"])

if pds.size > 0:
pdlocations = pds['objects'].apply(lambda x : x['polygon'])

for alist in pdlocations:
points = np.array(alist)
HM_for_worker.add_to_mask(points)
#total_pds_bbox += 1

iner_result_q.put(HM_for_worker)
print("puting HM")


Подробнее здесь: https://stackoverflow.com/questions/791 ... never-stop
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Разница между многопроцессорностью, асинхронностью, многопоточностью и concurrency.futures в Python
    Anonymous » » в форуме Python
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • PDB не остановится внутри кода pytest, остановится при первом вызове функции
    Гость » » в форуме Python
    0 Ответы
    26 Просмотры
    Последнее сообщение Гость
  • Если возникнет исключение, остановится ли программа
    Anonymous » » в форуме C#
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • Избегайте ошибок во время выполнения, вызванных многопроцессорностью в Windows
    Anonymous » » в форуме Python
    0 Ответы
    27 Просмотры
    Последнее сообщение Anonymous
  • Cover.py с многопроцессорностью работает только с unittest, но не с pytest?
    Anonymous » » в форуме Python
    0 Ответы
    12 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»