from datetime import datetime, timedelta, timezone from typing import List, Union from fastapi import Depends, HTTPException, Security, status from fastapi.security import OAuth2PasswordBearer, SecurityScopes from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel, ValidationError from modules.database import col_users with open("secret_key", "r", encoding="utf-8") as f: SECRET_KEY = f.read() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 180 class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): user: Union[str, None] = None scopes: List[str] = [] class User(BaseModel): user: str email: Union[str, None] = None disabled: Union[bool, None] = None class UserInDB(User): hash: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={ "me": "Get current user's data.", "albums.list": "List albums.", "albums.read": "Read albums data.", "albums.write": "Modify albums.", "photos.list": "List photos.", "photos.read": "View photos.", "photos.write": "Modify photos.", "videos.list": "List videos.", "videos.read": "View videos.", "videos.write": "Modify videos.", }, ) def verify_password(plain_password, hashed_password) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password) -> str: return pwd_context.hash(password) async def get_user(user: str) -> UserInDB: found_user = await col_users.find_one({"user": user}) if found_user is None: raise RuntimeError(f"User {user} does not exist") return UserInDB( user=found_user["user"], email=found_user["email"], disabled=found_user["disabled"], hash=found_user["hash"], ) async def authenticate_user(user_name: str, password: str) -> Union[UserInDB, bool]: if user := await get_user(user_name): return user if verify_password(password, user.hash) else False else: return False def create_access_token( data: dict, expires_delta: Union[timedelta, None] = None ) -> str: to_encode = data.copy() if expires_delta: expire = datetime.now(tz=timezone.utc) + expires_delta else: expire = datetime.now(tz=timezone.utc) + timedelta( days=ACCESS_TOKEN_EXPIRE_DAYS ) to_encode["exp"] = expire return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ) -> UserInDB: if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user: str = payload.get("sub") if user is None: raise credentials_exception token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError) as exc: raise credentials_exception from exc user_record = await get_user(user=token_data.user) if user_record is None: raise credentials_exception for scope in security_scopes.scopes: if scope not in token_data.scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user_record async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user