Как использовать аутентификацию для конкретных конечных точек в FastAPI?Python

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Как использовать аутентификацию для конкретных конечных точек в FastAPI?

Сообщение Anonymous »

Не могли бы вы сказать мне, как создать конечные точки, которые работают только после аутентификации пользователя? Мне нужны только конечные точки, расположенные под файлами линии = [] для работы таким образом. На данный момент все работает без аутентификации, хотя это не должно.

Код: Выделить всё

import uvicorn
from fastapi import FastAPI, HTTPException, Response, Depends, UploadFile
from fastapi.responses import StreamingResponse, FileResponse
from pydantic import BaseModel, Field
from typing import Annotated, List
from authx import AuthX, AuthXConfig
from sqlalchemy import select
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from passlib.context import CryptContext

# Хэширование паролей
password_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# Конфигурация FastAPI и создание сессии БД
app = FastAPI() # Объявляем приложение FastAPI
engine = create_async_engine("sqlite+aiosqlite:///users.db", echo=True) # Объявляем асинхронный движок нашей БД
new_session = async_sessionmaker(engine, expire_on_commit=False) # Запускаем асинхронную сессию с сохранением объектов

async def get_session():
"Обращение к сессии БД"
async with new_session() as session:
yield session

SessionDep = Annotated[AsyncSession, Depends(get_session)] # Создаём зависимость сессии БД от FastAPI

class Base(DeclarativeBase):
pass

class UserModel(Base): # Создаём таблицу в БД, в которой будут находиться данные пользователей
__tablename__ = "users"

# Привязываем данные к стоблцам таблицы
uid: Mapped[int] = mapped_column(primary_key=True, autoincrement=True, nullable=False)
username: Mapped[str] = mapped_column(unique=True, index=True)
password: Mapped[str]

class UserSchema(BaseModel):
username: str = Field(max_length=10)
password: str = Field(min_length=8, max_length=16)

# Конфигурация AuthX
config = AuthXConfig() # Конфигурация AuthX
config.JWT_SECRET_KEY = "kirieshki" # Секретный ключ для создания JWT-токенов
config.JWT_ACCESS_COOKIE_NAME = "access_token"  # Название куки-токена
config.JWT_TOKEN_LOCATION = ['cookies'] # Расположение куки-токенов
security = AuthX(config=config) # Обозначаем конфиг для модуля AuthX

@app.post("/register",
tags=["Авторизация"],
summary="Зарегистрировать аккаунт")
async def register(creds: UserSchema,
response: Response,
session: SessionDep):
"Регистрация пользователя"
# Проверка на существование пользователя с таким же username
check_user = select(UserModel).where(UserModel.username == creds.username)
existing_user = await session.execute(check_user)
if existing_user.scalar_one_or_none():
raise HTTPException(status_code=400, detail="User already exist")

# Хэширование паролей перед сохранением
hashed_password = password_context.hash(creds.password)

# Регистрация нового пользователя, если уже существующй в БД username не найден
new_user = UserModel(username=creds.username, password=hashed_password) # Создание нового пользователя
session.add(new_user) # Добавление нового пользователя в БД
await session.commit() # Сохранение данных в БД
await session.refresh(new_user) # Обновление информации в БД

# Создание JWT-токена с payload
access_token = security.create_access_token(uid=str(new_user.uid))
security.set_access_cookies(access_token, response)

return {"message": "User has been registered"}

@app.post("/login",
tags=["Авторизация"],
summary="Войти в аккаунт")
async def login(creds: UserSchema,
response: Response,
session: SessionDep):
"Вход пользователя"
# Проверка на существование в БД пользователя с таким же именем и паролем
check_user = select(UserModel).where(UserModel.username == creds.username)
existing_user = await session.execute(check_user)
user = existing_user.scalar_one_or_none()

# Проверяем, существует ли пользователь и совпадает ли хэш пароля с паролем
if not user or not password_context.verify(creds.password, user.password):
raise HTTPException(status_code=401, detail="Invalid username or password")

# Создание куки-токена
access_token = security.create_access_token(uid=str(user.uid))
security.set_access_cookies(access_token, response)

return {"Success": True}

@app.post("/reset",
tags=["Работа с БД"],
summary="Сброс БД")
async def reset_database():
"Сброс БД"
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
return {"База данных успешно сброшена": True}

files = [] # Список файлов, загружаемых на сервер

@app.get("/get_filenames",
tags=["Получение файлов"],
summary="Получить названия файлов")
async def get_filenames():
if len(files) == 0: # Проверка на наличие загруженных файлов
return {"File list is empty"}
else:
return files

def iterfile(filename: str): # Обработчик файлов.  Нарезает файлы на чанки для постепенной загрузки
"Обработка файлов / нарезка на чанки"
with open(filename, "rb") as file:
while chunk := file.read(1024 * 1024):
yield chunk

@app.get("/get_files",
tags=["Получение файлов"],
summary="Получить файл")
async def get_file(filename: str):
return FileResponse(filename)

@app.get("/streaming/{filename}",
tags=["Получение файлов"],
summary="Получение файла в стриминге")
async def get_streaming_file(filename: str):
return StreamingResponse(iterfile(filename))

@app.post("/upload",
tags=["Добавление файлов"],
summary="Загрузка файла")
async def upload_file(uploaded_file: UploadFile):
"Загрузка файла"
file = uploaded_file.file
filename = uploaded_file.filename
with open(f"1_{filename}", "wb") as f:
f.write(file.read())
files.append(f"1_{filename}")
return {"File was uploaded": True}

@app.post("/upload_multiple",
tags=["Добавление файлов"],
summary="Загрузка нескольких файлов")
async def upload_files(uploaded_files: list[UploadFile]):
for uploaded_file in upload_files:
file = uploaded_file.file
filename = uploaded_file.filename
with open(f"1_{filename}", "wb") as f:
f.write(file.read())
files.append(f"1_{filename}")
return {"Multiple files were uploaded": True}

if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, reload=True)
Я попытался настроить аутентификацию с помощью модуля Authx, но это не сработало.

Подробнее здесь: https://stackoverflow.com/questions/796 ... in-fastapi
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение

Вернуться в «Python»