Я создал виртуального помощника для своей компании. Фронтенд — React, бэкенд — Python, Docker. Существует панель администратора, но PDF-файлы пользователей по-прежнему существуют даже для удаленных пользователей. Я хочу изменить свой код, чтобы PDF-файлы автоматически удалялись, когда администратор удаляет пользователей.
Я создал виртуального помощника для своей компании. Фронтенд — React, бэкенд — Python, Docker. Существует панель администратора, но PDF-файлы пользователей по-прежнему существуют даже для удаленных пользователей. Я хочу изменить свой код, чтобы PDF-файлы автоматически удалялись, когда администратор удаляет пользователей. [code]import uuid from db.psql_connector import DB, default_config from datetime import datetime from .jwt_handler import JWTHandler from type_def.auth import ( AuthSuccess, AuthError, RegisterUserModel, LoginUserModel, User, ) from typing import Optional, Union from fastapi import Depends from fastapi.security import OAuth2PasswordBearer, APIKeyHeader from auth.api_key_handler import APIKeyManager from argon2 import PasswordHasher from argon2.exceptions import VerifyMismatchError, InvalidHash import os import shutil
class AuthHandler: def __init__(self, user: Union[User, None] = None): self.db = DB(default_config()) self.user = user
async def get_all_users(self) -> Union[list[User], AuthError]: """ Retrieves a list of all registered users. This function should only be accessible by super admin users. """
if not self.user or not self.user.is_super_admin: return AuthError("Insufficient permissions to list users")
try: self.db.exec("SELECT * FROM users") results = self.db.fetchall() users = results # Return public representation return users except Exception as e: return AuthError(str(e))
async def login(self, cred: LoginUserModel) -> Union[AuthSuccess, AuthError]: try: self.db.exec("SELECT * FROM users WHERE email = %s LIMIT 1", (cred.email,)) result = self.db.fetchone()
if result: if PasswordHasher().verify(result["password"], cred.password): print(result['password'],cred.password) jwt_handler = JWTHandler(aud=f"rae:web") token = jwt_handler.encode( { "id": result["id"], "email": result["email"], } ) self.db.exec( "UPDATE users SET last_login_ts = %s WHERE id = %s", (datetime.now(), result["id"]), ) return AuthSuccess( "User logged in successfully", { "jwt": token, "user": User(**result).pub(), "gen_ts": datetime.now().timestamp(), }, ) return AuthError("Invalid Credentials") return AuthError("User not found. Please register") except VerifyMismatchError as e: return AuthError("Invalid Credentials") except Exception as e: return AuthError(f"{str(e)}")
async def logout(): pass
async def get_user(self, user_id): pass
async def update_user(): pass
async def delete_user(self, user_id: str) -> bool: # Delete related records from the database self.db.exec("DELETE FROM api_keys WHERE user_id = %s", (user_id,))
# Path to the PDF storage directory pdf_dir = os.path.join("stores", "pdf_dir")
# Query database to get file paths of PDFs related to the user self.db.exec("SELECT file_name FROM pdfs WHERE user_id = %s", (user_id,)) pdf_files = [row['file_name'] for row in self.db.cursor.fetchall()]
# Debugging: Print out file names print(f"PDFs to delete for user {user_id}: {pdf_files}")
for pdf_file in pdf_files: pdf_path = os.path.join(pdf_dir, pdf_file) if os.path.exists(pdf_path): print(f"Deleting file: {pdf_path}") # Debug log os.remove(pdf_path) else: print(f"File not found: {pdf_path}") # Debug log
# Delete records from the `pdfs` table and the `users` table self.db.exec("DELETE FROM pdfs WHERE user_id = %s", (user_id,)) self.db.exec("DELETE FROM users WHERE id = %s", (user_id,)) self.db.commit() return True
async def reset_password(): pass
async def change_password( self, user_id, new_password, curr_password=None ) -> AuthSuccess | AuthError: try: if self.user and (self.user.id == user_id or self.user.is_super_admin): self.db.exec("SELECT * FROM users WHERE id = %s LIMIT 1", (user_id,)) result = self.db.fetchone() if result: if self.user.is_super_admin: enc_password = PasswordHasher().hash(new_password) self.db.exec( "UPDATE users SET password = %s WHERE id = %s", (enc_password, user_id), ) self.db.commit() return AuthSuccess("Password Changed Successfully", {}) else: if PasswordHasher().verify(result["password"], curr_password): enc_password = PasswordHasher().hash(new_password) self.db.exec( "UPDATE users SET password = %s WHERE id = %s", (enc_password, user_id), ) self.db.commit() return AuthSuccess("Password Changed Successfully", {}) return AuthError("User not found") return AuthError("No permission to change password") except VerifyMismatchError as e: return AuthError("Invalid Credentials") except InvalidHash as e: return AuthError("Invalid Credentials") except Exception as e: return AuthError(f"Error: {str(e)}")
async def send_verification_email(): pass
async def verify_email(): pass
async def check_email_exist(self, email: str) -> bool: self.db.exec("SELECT * FROM users WHERE email = %s LIMIT 1", (email,)) if self.db.cursor.fetchone(): return True return False
def get_current_active_user_jwt( current_user: Optional[dict] = Depends(get_current_user_jwt), ) -> Optional[User]: if current_user: db = DB(default_config()) db.exec( "SELECT * FROM users WHERE id = %s LIMIT 1", (current_user["id"],), ) result = db.fetchone() user = User(**result) if not user.disabled: return user return None
async def get_current_active_admin_user_jwt( current_user: User = Depends(get_current_user_jwt), ) -> Optional[User]: if not current_user.disabled and current_user.is_super_admin: return current_user return None
async def get_current_user_api_key( token: str = Depends(APIKeyHeader(name="X-Api-Key")), ) -> Optional[User]: print(token) result = APIKeyManager().safe_get(token) user = User(**result) return user
def get_current_active_user_api_key( current_user: Optional[dict] = Depends(get_current_user_api_key), ) -> Optional[User]: if not current_user.disabled: return current_user return None [/code] Файлы PDF должны удаляться автоматически, как и все записи пользователя.