Внутренний вызов функции в FastAPI/параметр, который я могу установить только внутри приложения FastAPI.Python

Программы на Python
Ответить
Anonymous
 Внутренний вызов функции в FastAPI/параметр, который я могу установить только внутри приложения FastAPI.

Сообщение Anonymous »

Я написал свою собственную логику сеанса и использую следующий декоратор для проверки запроса:

Код: Выделить всё

def require_session(func):
@wraps(func)
async def wrapper(*args, **kwargs):
request = kwargs['request']
session_key = request.headers.get('X-Session-Key')
if session_key is not None:
session = Session.check(session_key)  # Check in db
if session is not None:
return await func(*args, **kwargs)
raise exc.access_denied()
return wrapper
И я использую это вот так

Код: Выделить всё

import fastapi as fa

employees_router = fa.APIRouter(default_response_class=fa.responses.JSONResponse)

@employees_router.get('/{id}')
@require_session
async def get_employee(request: fa.Request, id: int) -> EmployeeDBSchema:
employee = Employee.get_or_none(id=id)
if employee is None:
raise exc.instance_by_field_not_found(Employee, id)
return employee
Я просто проверяю сеансовый ключ сначала в заголовках, а затем в базе данных. И все работает нормально, пока я не попытаюсь вызвать эти методы внутри других методов.

Код: Выделить всё

@administrators_router.post('')
@require_session
async def create_administrator(request: fa.Request, data: AdministratorSchema) -> AdministratorDBSchema:
data_dump = data.model_dump()
if isinstance(data_dump['employee'], int):
await get_employee(request, data_dump['employee'])  # Inner call
try:
admin = Administrator.create(**data_dump)
except pw.IntegrityError as e:
raise exc.integrity(e)
return admin
Это приводит к тому, что сеанс проверяется дважды. Как мне избежать проверки второго сеанса?
Я искал что-то вроде "Параметр, который я могу установить только внутри приложения fastapi" просто определить дополнительный параметр Internal_call=false, который сообщает декоратору не проверять сессию, но я ничего не нашел.
Второе решение — разделить конечная точка из его реализации выглядит следующим образом:

Код: Выделить всё

@employees_router.get('/{id}')
@require_session
async def get_employee(request: fa.Request, id: int) -> EmployeeDBSchema:
return get_employee_impl(id)

async def get_employee_impl(id: int) -> EmployeeDBSchema:
employee = Employee.get_or_none(id=id)
if employee is None:
raise exc.instance_by_field_not_found(Employee, id)
return employee
И просто вызовите get_employee_impl, где я хочу избежать проверки сеанса, но переписывание всех функций может занять много времени. Кроме того, это решение мне кажется не очень чистым.


Подробнее здесь: https://stackoverflow.com/questions/791 ... de-the-fas
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»