Fastapi oauth2 + jwt продлевает время опыта при каждом запросеPython

Программы на Python
Ответить Пред. темаСлед. тема
Anonymous
 Fastapi oauth2 + jwt продлевает время опыта при каждом запросе

Сообщение Anonymous »

На примере fastapi мы можем использовать веб-токены ouath2 и json для создания логина для пользователей:
from datetime import datetime, timedelta, timezone
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}

class Token(BaseModel):
access_token: str
token_type: str

class TokenData(BaseModel):
username: str | None = None

class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None

class UserInDB(User):
hashed_password: str

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
return pwd_context.hash(password)

def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user

async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user

@app.post("/token")
async def login_for_access_token(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")

@app.get("/users/me/", response_model=User)
async def read_users_me(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return current_user

@app.get("/users/me/items/")
async def read_own_items(
current_user: Annotated[User, Depends(get_current_active_user)]
):
return [{"item_id": "Foo", "owner": current_user.username}]

Однако мы можем убедиться, что срок действия токена истечет через 30 минут. ACCESS_TOKEN_EXPIRE_MINUTES = 30. Мне бы хотелось иметь токен, срок действия которого будет продлеваться на текущее время + 10 минут каждый раз, когда пользователь делает запрос к любой конечной точке в этом приложении. Таким образом, я мог бы всегда сохранять период бездействия пользователя, в течение которого пользователь не будет внезапно выходить из системы, даже если он активно использует приложение. Есть ли способ сделать это, даже не касаясь хранения пользователей в базах данных, а только с помощью jwts, хранящихся в веб-браузере на стороне клиента? Каковы наилучшие методы решения подобной проблемы? обновлять заголовок авторизации при каждом
запросе? установить куки? Пожалуйста, сообщите.
РЕДАКТИРОВАТЬ: Это моя попытка использовать промежуточное программное обеспечение для изменения заголовков, но оно все еще не работает.
async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception

user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception

# Check if the token needs to be refreshed
expiration_time = datetime.fromtimestamp(payload["exp"], tz=timezone.utc)
if expiration_time - datetime.now(tz=timezone.utc) < timedelta(minutes=REFRESH_INTERVAL_MINUTES):
# Refresh the token with a new expiration time
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
new_access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return user, new_access_token

return user , token

async def get_current_active_user(
current_user_and_token: Annotated[User, Depends(get_current_user)]
):
current_user,_ = current_user_and_token
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user


Подробнее здесь: https://stackoverflow.com/questions/782 ... ry-request
Реклама
Ответить Пред. темаСлед. тема

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

  • Похожие темы
    Ответы
    Просмотры
    Последнее сообщение
  • Apache Artemis продлевает тайм-аут ClientConsumer.close()
    Anonymous » » в форуме JAVA
    0 Ответы
    16 Просмотры
    Последнее сообщение Anonymous
  • ActiveMQ Artemis продлевает тайм-аут ClientConsumer.close()
    Anonymous » » в форуме JAVA
    0 Ответы
    15 Просмотры
    Последнее сообщение Anonymous
  • Я изучаю разработку iOS без какого-либо предыдущего опыта программирования. Это нормально? [закрыто]
    Гость » » в форуме IOS
    0 Ответы
    84 Просмотры
    Последнее сообщение Гость
  • Можно ли найти работу в сфере программирования без опыта? [закрыто]
    Гость » » в форуме CSS
    0 Ответы
    25 Просмотры
    Последнее сообщение Гость
  • Библиотека компонентов React (или HTML5) для 3D-редакторов (запрос совета/опыта) [закрыто]
    Anonymous » » в форуме CSS
    0 Ответы
    45 Просмотры
    Последнее сообщение Anonymous

Вернуться в «Python»