Как распараллелить приложение Flask с помощью Gunicorn и распределить использование графического процессора между работнPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как распараллелить приложение Flask с помощью Gunicorn и распределить использование графического процессора между работн

Сообщение Anonymous »

Я создаю приложение Flask для обработки встраивания лиц с помощью DeepFace. Моя цель — обслужить примерно 50 клиентов с примерно 10 запросами в минуту. Каждый запрос включает запуск deepface.represent() для обработки изображения, для чего требуются ресурсы графического процессора.
Вот моя настройка:
`app.py: Main Flask app.
model.py: Loads the DeepFace models and includes the represent function.
database.py: Handles database interactions.
wsgi.py: Entrypoint for the Gunicorn server.
gunicorn.conf: Gunicorn configuration file.`

Я хочу запустить приложение с пятью рабочими процессами Gunicorn, каждый из которых использует свой логический графический процессор. Однако я сталкиваюсь с проблемами при настройке подразделения графического процессора:
Логические устройства графического процессора: создание логических устройств графического процессора с использованием TensorFlow (например, tf.config.experimental.set_virtual_device_configuration) необходимо подключить к отдельные работники. Размещение этой конфигурации в model.py (куда загружаются модели), похоже, не работает, поскольку все рабочие процессы используют одни и те же ресурсы графического процессора.
Распараллеливание рабочих: следует ли использовать синхронную (синхронизацию) ) или асинхронные (асинхронные) рабочие процессы в Gunicorn для такого типа рабочей нагрузки? Я стремлюсь обеспечить эффективное распараллеливание без перегрузки графического процессора.
Интеграция Ray: я рассматриваю возможность использования Ray для управления параллельной обработкой. Если необходим Ray, с какими частями приложения вы бы рекомендовали его интегрировать?
Ключевой вопрос:
Как настроить рабочие Gunicorn для каждой использовать отдельное логическое устройство графического процессора, гарантируя, что функция deepface.represent() работает параллельно без конкуренции за ресурсы?
Любые советы о том, где должна находиться логика разделения графического процессора или как структурировать приложение для достижения этой цели, будут очень признательны .
под моим код:
from app import create_app
import os
# To install boto3, run:
# pip install boto3
import boto3

# Add AWS credentials loading
session = boto3.Session()
# This will automatically load credentials from the default chain
# (environment variables, ~/.aws/credentials, or IAM role)

app = create_app()
app.secret_key = os.urandom(24)

if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)

class ModelLoader:
"""
Singleton class for loading and managing face detection and recognition models.
This class ensures models are loaded only once and shared across the application.
"""
_instance = None

def __new__(cls):
if cls._instance is None:
cls._instance = super(ModelLoader, cls).__new__(cls)
cls._instance._initialize()
return cls._instance

def _initialize(self):
"""Initialize model attributes and load models."""
# Adicionar configuração de memória
self.config = self.load_config()
#self.gpu_memory_limit = self.config.get('gpu_memory_limit', 1024) # Valor padrão 1GB

# Create or open GPU log file
self.gpu_log_file = "gpu.log"
with open(self.gpu_log_file, "a") as f:
f.write("\n--- New Session Started ---\n")

self.facenet512_model = None
self.retinaface_model = None
self.mtcnn = None
self.opencv = None
self.load_models()

def load_config(self):
"""Load configurations from config.json file."""
try:
with open("config.json", "r") as file:
config = json.load(file)
return config
except FileNotFoundError:
# Retornar configuração padrão se arquivo não existir
return {
"gpu_memory_limit": 1024, # 1GB
"allow_growth": True
}

def get_model(self, model_name):
"""
Get a specific model by name.

Args:
model_name (str): Name of the model to retrieve.

Returns:
The requested model instance.

Raises:
ValueError: If the requested model is not found.
"""
if model_name == "Facenet512":
return self.facenet512_model
else:
raise ValueError(f"Model {model_name} not found.")

def load_models(self):
"""Load all required models if they haven't been loaded already."""
if self.facenet512_model is None:
start_time = time.time()
logger.info("Loading Facenet512 model...")

try:
self.facenet512_model = DeepFace.build_model("Facenet512")
self.retinaface_model = modeling.build_model(task="face_detector", model_name="retinaface")
self.mtcnn = modeling.build_model(task="face_detector", model_name="mtcnn")
self.opencv = modeling.build_model(task="face_detector", model_name="opencv")

end_time = time.time()
logger.info(f"Facenet512 model loaded in {end_time - start_time:.2f} seconds.")
except Exception as e:
logger.error(f"Error loading models: {str(e)}")
raise

def clear_memory(self):
"""Clear memory for TensorFlow only (no GPU operations)."""
try:

tf.keras.backend.clear_session()
except ImportError:
pass

def _log_resource_usage(self, operation: str):
"""Log GPU and CPU memory usage to gpu.log file."""
try:
# Get CPU usage
cpu_percent = psutil.cpu_percent()
ram_percent = psutil.virtual_memory().percent

# Get GPU usage if available
gpu_info = ""
gpus = GPUtil.getGPUs()
for gpu in gpus:
gpu_info += f"GPU {gpu.id}: Memory Use {gpu.memoryUsed}MB/{gpu.memoryTotal}MB ({gpu.memoryUtil*100:.1f}%) "

# Create log entry
timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
log_entry = f"[{timestamp}] {operation} - CPU: {cpu_percent}%, RAM: {ram_percent}%, {gpu_info}\n"

# Write to log file
with open(self.gpu_log_file, "a") as f:
f.write(log_entry)

except Exception as e:
logger.error(f"Error logging resource usage: {str(e)}")

def _is_frontal_face(self, facial_area: Dict[str, Any], threshold: float = 0.1) -> bool:
"""
Check if the face is frontal based on eye positions and face area.

Args:
facial_area: Dictionary containing face and eye coordinates
threshold: Tolerance threshold for determining if face is frontal

Returns:
bool: True if face is frontal, False otherwise
"""
try:
left_eye = facial_area["left_eye"]
right_eye = facial_area["right_eye"]
face_x, face_y, face_w, face_h = facial_area["x"], facial_area["y"], facial_area["w"], facial_area["h"]

# Calculate horizontal distance between eyes
eye_distance = np.linalg.norm(np.array(left_eye) - np.array(right_eye))

# Calculate ratio between eye distance and face width
eye_face_width_ratio = eye_distance / face_w

# Calculate vertical eye symmetry
eye_height_difference = abs(left_eye[1] - right_eye[1]) / face_h

return eye_height_difference < threshold and 0.3 < eye_face_width_ratio < 0.6

except Exception as e:
logger.error(f"Error checking frontal face: {str(e)}")
return False

def get_embeddings(
self,
img_path: Union[str, np.ndarray],
model_name: str = "Facenet512",
enforce_detection: bool = False,
detector_backend: str = "mtcnn",
align: bool = True,
expand_percentage: int = 0,
normalization: str = "Facenet",
anti_spoofing: bool = False,
max_faces: Optional[int] = 1,
) -> Union[List[Dict[str, Any]], Dict[str, int]]:
"""
Get facial embeddings using the specified model.
Returns resultado: 4 if face is not frontal.
"""
try:
start_time = time.time()
self._log_resource_usage("Before embeddings generation")

embeddings = DeepFace.represent(
img_path=img_path,
model_name=model_name,
enforce_detection=enforce_detection,
detector_backend=detector_backend,
align=align,
expand_percentage=expand_percentage,
normalization=normalization,
anti_spoofing=anti_spoofing,
max_faces=1,
)

if not embeddings:
logger.warning("No faces detected")
return {"resultado": 4}

# Check if face is frontal
if not self._is_frontal_face(embeddings[0]["facial_area"]):
logger.warning("Face is not frontal")
return {"resultado": 4}

end_time = time.time()
self._log_resource_usage("After embeddings generation")

# Store result before cleanup
result = embeddings.copy()
self.clear_memory()
return result

except Exception as e:
logger.error(f"Error getting embeddings: {str(e)}")
raise

def extract_faces(
self,
img_path: Union[str, np.ndarray],
detector_backend: str = "mtcnn",
enforce_detection: bool = True,
align: bool = True,
expand_percentage: int = 30,
grayscale: bool = False,
color_face: str = "rgb",
normalize_face: bool = True,
anti_spoofing: bool = False,
) -> List[Dict[str, Any]]:
"""
Extract faces from the given image.
See DeepFace.extract_faces() for detailed documentation.
"""
try:
self._log_resource_usage("Before face extraction")

faces = DeepFace.extract_faces(
img_path=img_path,
detector_backend=detector_backend,
enforce_detection=enforce_detection,
align=align,
expand_percentage=expand_percentage,
grayscale=grayscale,
color_face=color_face,
normalize_face=normalize_face,
anti_spoofing=anti_spoofing,
)

self._log_resource_usage("After face extraction")
# Store result before cleanup
result = faces.copy() # Make a copy to be safe
self.clear_memory()
return result
except Exception as e:
logger.error(f"Error extracting faces: {str(e)}")
raise

def get_first_face_embedding(
self,
img_path: Union[str, np.ndarray],
model_name: str = "Facenet512",
detector_backend: str = "retinaface",
enforce_detection: bool = False,
align: bool = True,
expand_percentage: int = 0,
normalization: str = "Facenet",
) -> Optional[np.ndarray]:
"""
Extract the first face from an image and return its embedding.

This method takes an image input and performs the following steps:
1. Detects faces in the image using the specified detector backend
2. Takes the first detected face
3. Generates an embedding vector for that face using the specified model

Args:
img_path: Either a string path to an image file or a numpy array containing the image data
model_name: Name of the face embedding model to use (default: Facenet512)
detector_backend: Which face detection model to use (default: retinaface)
enforce_detection: Whether to raise an error if no face is found (default: True)
align: Whether to align the detected face before generating embedding (default: True)
expand_percentage: How much to expand the detected face region by (default: 0)
normalization: Type of normalization to apply to face pixels (default: Facenet)

Returns:
Optional[np.ndarray]: A numpy array containing the face embedding vector if a face is found,
or None if no face is detected and enforce_detection is False

Raises:
Exception: If no face is detected and enforce_detection is True, or if there are other errors
during face detection or embedding generation

"""
try:
# First extract faces
faces = self.extract_faces(
img_path=img_path,
detector_backend=detector_backend,
enforce_detection=enforce_detection,
align=align,
expand_percentage=expand_percentage,
)

if not faces:
logger.warning("No faces detected in image")
return None

# Get embedding for the first face
first_face = faces[0]
embedding = self.get_embeddings(
img_path=first_face['face'], # Use the extracted face image
model_name=model_name,
enforce_detection=False, # Face already detected
detector_backend=detector_backend,
align=False, # Face already aligned
normalization=normalization,
)

return embedding[0]['embedding'] # Return just the embedding array
self.clear_memory()
except Exception as e:
logger.error(f"Error getting first face embedding: {str(e)}")
raise

def is_face_frontal(self, img_path: Union[str, np.ndarray], threshold_angle: float = 30.0) -> int:
"""
Check if the largest detected face is frontal.

Args:
img_path: Image path or numpy array
threshold_angle: Maximum allowed head rotation angle in degrees (default: 30.0)

Returns:
int: 0 if the face is frontal, 1 if not frontal or no face detected
"""
try:
# If input is a string (file path), load the image
if isinstance(img_path, str):
img = cv2.imread(img_path)
if img is None:
logger.error("Failed to load image")
return 1
else:
img = img_path

mtcnn_detector = mtcnn()
faces = mtcnn_detector.detect_faces(img)

if not faces:
logger.warning("No faces detected")
return 1

# Get the largest face based on area
largest_face = max(faces, key=lambda x: x.w * x.h)

# Calculate the angle between eyes
left_eye = largest_face.left_eye
right_eye = largest_face.right_eye

# Calculate angle
dx = right_eye[0] - left_eye[0]
dy = right_eye[1] - left_eye[1]
angle = abs(np.degrees(np.arctan2(dy, dx)))

# Check if face is frontal based on angle
is_frontal = angle < threshold_angle

return 0 if is_frontal else 1

except Exception as e:
logger.error(f"Error checking face orientation: {str(e)}")
return 1

from flask import Flask
from routes import register_routes
import logging
from database import DatabaseLoader
from model import ModelLoader
import os
import boto3

def create_app():
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', mode='a')
]
)

# Configure AWS credentials
aws_access_key = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
aws_region = os.environ.get('AWS_REGION', 'us-east-1')

# Initialize AWS session
if aws_access_key and aws_secret_key:
boto3.setup_default_session(
aws_access_key_id=aws_access_key,
aws_secret_access_key=aws_secret_key,
region_name=aws_region
)

app = Flask(__name__)

try:
# Initialize singletons
DatabaseLoader() # Initialize database connection
ModelLoader() # Load ML models
except Exception as e:
logging.error(f"Failed to initialize application components: {str(e)}")
raise

# Register routes
register_routes(app)

return app

# Add this for Gunicorn
application = create_app()

# Add this to run the app directly
if __name__ == '__main__':
application.run(host='0.0.0.0', port=5000, debug=True)


Подробнее здесь: https://stackoverflow.com/questions/792 ... among-work
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»