Что заставляет программу запускаться один раз, но не постоянно?Python

Программы на Python
Ответить
Anonymous
 Что заставляет программу запускаться один раз, но не постоянно?

Сообщение Anonymous »

Я экспериментирую с multiprocessing.ThreadPool и очередями и наблюдаю странное поведение, которого я не понимаю. Программа запускается один раз, но не делает это постоянно, как следует.
Сначала я подумал, что это проблема с pycharm, поэтому начал с обновления pycharm и моей операционной системы, но это не помогло. Затем я изменил конфигурацию запуска, добавив опцию «Запуск с консолью Python», но она выполняется только один раз. Я также попробовал «Добавление в процесс» согласно https://stackoverflow.com/a/46440286/28215801. Затем я попытался добавить блокировку в код, думая, что это проблема гонки, но это также не решило проблему.
Единственное, что работает, — это запустить программу в режиме отладки, а затем пройти через код, это обеспечивает желаемое поведение, но это непрактично.
worker_core.py

Код: Выделить всё

#import common_imports as ci

import logging
from queue import Queue, Empty
from multiprocessing.pool import ThreadPool
from multiprocessing import Event
from threading import Lock
import time

class MultithreadedPoolAndQueue:
def __init__(self, available_modules):
self.logger = logging.getLogger("Test_Program.MultithreadedPoolAndQueue")
self.available_modules = available_modules
self.logger.info("Multithreaded Pool and Queue Management Initialised")
self.queue = Queue()
self.pool = ThreadPool(processes=len(self.available_modules))
self.all_tasks_completed = Event()
self.lockymclockface = Lock()

def unpack_results(self,results):
filtered_results = []
for key, val in results.items():
if isinstance(val, list):
for item in val:
filtered_results.append({key: item})
else:
filtered_results.append({key: val})

return filtered_results

def fill_queue(self, data):
for d in data:
# Each data will be {DATA_TYPE: Data}
self.queue.put(d)

def parse_results(self, results):
"""
This function will need the filtered results.
It will put the data onto the queue like this: {DATA_TYPE: Data}

:param results:
:return:
"""
self.logger.info(f"Entering parse_results with: {results}")
if results is None:
pass

else:
unpacked_results = self.unpack_results(results)

for result in unpacked_results:
self.logger.info(f"Result added: {result}")
self.queue.put(result)

def error_call(self,e):
self.logger.error(e)
pass

def thread_worker(self):
max_check_count = 10
check = 0
loop_around = 0

while self.queue.empty() is False or loop_around < 2 and check < max_check_count:
try:

item = self.queue.get_nowait()

# Assuming item is in the format {data_type: data}
data_type = next(iter(item))  # Get the first (and only) key from the dictionary
query = item[data_type]

# Then for each module in available modules, apply an asynchronous system
for module in self.available_modules:
for module_name, module_class in module.items():
# Check if the module can handle this data type
if data_type in module_class.accepted_data:
with self.lockymclockface:
module_instance = module_class()
self.pool.apply_async(module_instance.event,args=(query, data_type),callback=self.parse_results,error_callback=self.error_call)

loop_around = 0
check = 0  # Reset check counter on successful item processing

except Empty:
# If the queue is empty, wait for a short time before checking again
time.sleep(0.1)
check += 1
loop_around += 1

except Exception as e:
self.logger.exception(e)
loop_around += 1

# Wait for all async tasks to complete
self.pool.close()
self.pool.join()
self.all_tasks_completed.set()

def wait_for_completion(self):
self.all_tasks_completed.wait()
self.logger.info(f"Queue item: {self.queue.get()}")
self.logger.info("All tasks have been completed.")

Как я это использую:

Код: Выделить всё

threaded_core = worker_core.MultithreadedPoolAndQueue(self.available_modules)
threaded_core.fill_queue(self.targets)
threaded_core.thread_worker()
threaded_core.wait_for_completion()
Полные файлы проекта можно найти по адресу: https://pastebin.com/u/cantcode4sh
ПРИМЕЧАНИЕ. Папка модулей предназначена для в папке Consumer_producer. При запуске не забудьте добавить путь к файлу config.ini и добавить файл config.ini в Test_Program.py при запуске.

Подробнее здесь: https://stackoverflow.com/questions/791 ... ntinuously
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»