Автоматически удалять файлы при удалении связанного пользователя из базы данных.Python

Программы на Python
Ответить
Anonymous
 Автоматически удалять файлы при удалении связанного пользователя из базы данных.

Сообщение Anonymous »

Я создал виртуального помощника для своей компании. Фронтенд — React, бэкенд — Python, Docker. Существует панель администратора, но PDF-файлы пользователей по-прежнему существуют даже для удаленных пользователей. Я хочу изменить свой код, чтобы PDF-файлы автоматически удалялись, когда администратор удаляет пользователей.

Код: Выделить всё

import uuid
from db.psql_connector import DB, default_config
from datetime import datetime
from .jwt_handler import JWTHandler
from type_def.auth import (
AuthSuccess,
AuthError,
RegisterUserModel,
LoginUserModel,
User,
)
from typing import Optional, Union
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer, APIKeyHeader
from auth.api_key_handler import APIKeyManager
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError, InvalidHash
import os
import shutil

class AuthHandler:
def __init__(self, user: Union[User, None] = None):
self.db = DB(default_config())
self.user = user

async def get_all_users(self) -> Union[list[User], AuthError]:
"""
Retrieves a list of all registered users.  This function should
only be accessible by super admin users.
"""

if not self.user or not self.user.is_super_admin:
return AuthError("Insufficient permissions to list users")

try:
self.db.exec("SELECT * FROM users")
results = self.db.fetchall()
users = results  # Return public representation
return users
except Exception as e:
return AuthError(str(e))

async def register(self, reg: RegisterUserModel) -> Union[AuthSuccess, AuthError]:
try:
password = reg.password.encode("utf-8")
# enc_password = bcrypt.hashpw(password, bcrypt.gensalt())
enc_password = PasswordHasher().hash(password)
# check email already exists
user_exist = await self.check_email_exist(reg.email)
if user_exist:
return AuthError("User is already exist.  Please login")

self.db.exec(
"INSERT INTO users(id, first_name, last_name, email, password, tel, created_ts, updated_ts, last_login_ts) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING *",
(
str(uuid.uuid4()),
reg.first_name,
reg.last_name,
reg.email,
enc_password,
reg.tel,
datetime.now(),
None,
None,
),
)
self.db.commit()
result = self.db.fetchone()
if result:
print('----------------------',result)
return AuthSuccess("User registered successfully", User(**result).pub())
return AuthError("User not created")
except Exception as e:
return AuthError(str(e))

async def login(self, cred: LoginUserModel) -> Union[AuthSuccess, AuthError]:
try:
self.db.exec("SELECT * FROM users WHERE email = %s LIMIT 1", (cred.email,))
result = self.db.fetchone()

if result:
if PasswordHasher().verify(result["password"], cred.password):
print(result['password'],cred.password)
jwt_handler = JWTHandler(aud=f"rae:web")
token = jwt_handler.encode(
{
"id": result["id"],
"email": result["email"],
}
)
self.db.exec(
"UPDATE users SET last_login_ts = %s WHERE id = %s",
(datetime.now(), result["id"]),
)
return AuthSuccess(
"User logged in successfully",
{
"jwt": token,
"user": User(**result).pub(),
"gen_ts": datetime.now().timestamp(),
},
)
return AuthError("Invalid Credentials")
return AuthError("User not found.  Please register")
except VerifyMismatchError as e:
return AuthError("Invalid Credentials")
except Exception as e:
return AuthError(f"{str(e)}")

async def logout():
pass

async def get_user(self, user_id):
pass

async def update_user():
pass

async def delete_user(self, user_id: str) -> bool:
# Delete related records from the database
self.db.exec("DELETE FROM api_keys WHERE user_id = %s", (user_id,))

# Path to the PDF storage directory
pdf_dir = os.path.join("stores", "pdf_dir")

# Query database to get file paths of PDFs related to the user
self.db.exec("SELECT file_name FROM pdfs WHERE user_id = %s", (user_id,))
pdf_files = [row['file_name'] for row in self.db.cursor.fetchall()]

# Debugging: Print out file names
print(f"PDFs to delete for user {user_id}: {pdf_files}")

for pdf_file in pdf_files:
pdf_path = os.path.join(pdf_dir, pdf_file)
if os.path.exists(pdf_path):
print(f"Deleting file: {pdf_path}")  # Debug log
os.remove(pdf_path)
else:
print(f"File not found: {pdf_path}")  # Debug log

# Delete records from the `pdfs` table and the `users` table
self.db.exec("DELETE FROM pdfs WHERE user_id = %s", (user_id,))
self.db.exec("DELETE FROM users WHERE id = %s", (user_id,))
self.db.commit()
return True

async def reset_password():
pass

async def change_password(
self, user_id, new_password, curr_password=None
) -> AuthSuccess | AuthError:
try:
if self.user and (self.user.id == user_id or self.user.is_super_admin):
self.db.exec("SELECT * FROM users WHERE id = %s LIMIT 1", (user_id,))
result = self.db.fetchone()
if result:
if self.user.is_super_admin:
enc_password = PasswordHasher().hash(new_password)
self.db.exec(
"UPDATE users SET password = %s WHERE id = %s",
(enc_password, user_id),
)
self.db.commit()
return AuthSuccess("Password Changed Successfully", {})
else:
if PasswordHasher().verify(result["password"], curr_password):
enc_password = PasswordHasher().hash(new_password)
self.db.exec(
"UPDATE users SET password = %s WHERE id = %s",
(enc_password, user_id),
)
self.db.commit()
return AuthSuccess("Password Changed Successfully", {})
return AuthError("User not found")
return AuthError("No permission to change password")
except VerifyMismatchError as e:
return AuthError("Invalid Credentials")
except InvalidHash as e:
return AuthError("Invalid Credentials")
except Exception as e:
return AuthError(f"Error: {str(e)}")

async def send_verification_email():
pass

async def verify_email():
pass

async def check_email_exist(self, email: str) -> bool:
self.db.exec("SELECT * FROM users WHERE email = %s LIMIT 1", (email,))
if self.db.cursor.fetchone():
return True
return False

async def get_current_user_jwt(
token: str = Depends(OAuth2PasswordBearer(tokenUrl="token")),
) -> Optional[dict]:
jwt_handler = JWTHandler(aud=f"rae:web")
user_dict = jwt_handler.decode(token)
return user_dict

def get_current_active_user_jwt(
current_user: Optional[dict] = Depends(get_current_user_jwt),
) ->  Optional[User]:
if current_user:
db = DB(default_config())
db.exec(
"SELECT * FROM users WHERE id = %s LIMIT 1",
(current_user["id"],),
)
result = db.fetchone()
user = User(**result)
if not user.disabled:
return user
return None

async def get_current_active_admin_user_jwt(
current_user: User = Depends(get_current_user_jwt),
) -> Optional[User]:
if not current_user.disabled and current_user.is_super_admin:
return current_user
return None

async def get_current_user_api_key(
token: str = Depends(APIKeyHeader(name="X-Api-Key")),
) -> Optional[User]:
print(token)
result = APIKeyManager().safe_get(token)
user = User(**result)
return user

def get_current_active_user_api_key(
current_user: Optional[dict] = Depends(get_current_user_api_key),
) -> Optional[User]:
if not current_user.disabled:
return current_user
return None
Файлы PDF должны удаляться автоматически, как и все записи пользователя.

Подробнее здесь: https://stackoverflow.com/questions/791 ... m-database
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»