Проблема с concurrent.futures.ThreadPoolExecutorPython

Программы на Python
Ответить
Anonymous
 Проблема с concurrent.futures.ThreadPoolExecutor

Сообщение Anonymous »

Я создаю скрипт для анализа файлов, и чтобы сделать его быстрее, я использую потоки для одновременной обработки большего количества файлов и тайм-аут для больших файлов, но когда он достигает тайм-аута, программа запускается работать все медленнее и медленнее, я думаю, это может быть из-за того, что темы не закрыты?

Код: Выделить всё

import concurrent.futures
import os
import time
import queue
from datetime import datetime
from collections import deque

# Dummy print_progress function to avoid undefined reference
def print_progress(message, config, message_type="info"):
print(f"{message_type.upper()}: {message}")

# Minimal dummy functions for file processing
def read_pdf(file_path, config):
time.sleep(5)  # Simulate a long-running task
return "PDF content"

def search_in_filename(filename, config):
return "sensitive" in filename

def search_in_content(content, filename, config):
return ["found sensitive data"] if "sensitive"  in content else []

def process_with_timeout(func, file_path, config):
timeout_value = config["timeout"]
print_progress(f"Processing {file_path} with timeout {timeout_value} seconds", config, message_type="debug")

with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(func, file_path, config)
try:
result = future.result(timeout=timeout_value)
print_progress(f"Finished processing {file_path}", config, message_type="debug")
return result
except concurrent.futures.TimeoutError:
print_progress(f"Timeout reached for {file_path}, task will be canceled", config, message_type="skipping")
future.cancel()
return None
except Exception as e:
print_progress(f"Error processing {file_path}: {e}", config, message_type="skipping")
return None

def process_file(file_path, filename, config, results_queue):
is_sensitive_filename = search_in_filename(filename, config)

if is_sensitive_filename:
print_progress(f"[+] Sensitive data found in filename: {file_path}", config, message_type="info")
results_queue.put({
"path": file_path,
"type": "filename",
"found": f"Sensitive keyword found in filename: {filename}"
})
return

content = None
if filename.endswith(".pdf"):
content = process_with_timeout(read_pdf, file_path, config)

if content is None:
print_progress(f"[-] Timeout or error processing {file_path}", config, message_type="skipping")
return

found_sensitive_data = []
if content:
found_sensitive_data = search_in_content(content, filename, config)

if found_sensitive_data:
print_progress(f"[+] Sensitive data found in {file_path}", config, message_type="info")
results_queue.put({
"path": file_path,
"type": "content",
"found": found_sensitive_data
})

def bfs(config):
results_queue = queue.Queue()
root_dir = config["root_directory"]
q = deque([root_dir])
max_threads = config["max_threads"]
search_delay = config["search_delay"]

analysis_start_time = datetime.now()

with concurrent.futures.ThreadPoolExecutor(max_workers=max_threads) as executor:
futures = []
while q:
dirpath = q.popleft()
try:
for entry in os.scandir(dirpath):
full_entry_path = os.path.join(dirpath, entry.name)
if entry.is_file(follow_symlinks=False):
if config["verbosity_level"] >= 0:
print_progress(f"Analyzing: {full_entry_path}", config, message_type="analyzing")

time.sleep(search_delay)
future = executor.submit(process_file, full_entry_path, entry.name, config, results_queue)
futures.append(future)

except PermissionError:
print_progress(f"Permission denied for directory {dirpath}", config, message_type="skipping")

concurrent.futures.wait(futures, timeout=config["timeout"] * len(futures))

results = []
while not results_queue.empty():
results.append(results_queue.get())

finish_time = datetime.now()
total_time = finish_time - analysis_start_time
print_progress(f"Total analysis time: {total_time}", config, message_type="info")

return results

# Entry point function
def main():
# Default configuration
config = {
"root_directory": "./sample_directory",  # Change this to your desired root directory
"max_threads": 4,                         # Number of threads to use for concurrent file processing
"timeout": 10,                            # Timeout per file in seconds
"search_delay": 0.1,                      # Delay between file searches in seconds
"verbosity_level":  1                      # Verbosity level for print_progress
}

print_progress("Starting file analysis...", config, message_type="info")
results = bfs(config)

print_progress(f"Analysis complete. Found {len(results)} sensitive items.", config, message_type="info")
for result in results:
print(f"Sensitive data: {result}")

if __name__ == "__main__":
main()
Я попробовал Future.cancel(), но это не сработало или я реализовал его неправильно

Подробнее здесь: https://stackoverflow.com/questions/791 ... olexecutor
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»