from datetime import datetime, timedelta, timezone from os import getenv from typing import List, Union from fastapi import Depends, HTTPException, Security, status from fastapi.security import OAuth2PasswordBearer, SecurityScopes from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel, ValidationError from modules.database import col_users from modules.utils import configGet try: configGet("secret") except KeyError as exc: raise KeyError( "PhotosAPI secret is not set. Secret key handling has changed in PhotosAPI 0.6.0, so you need to add the config key 'secret' to your config file." ) from exc if configGet("secret") == "" and getenv("PHOTOSAPI_SECRET") is None: raise KeyError( "PhotosAPI secret is not set. Set the config key 'secret' or provide the environment variable 'PHOTOSAPI_SECRET' containing a secret string." ) SECRET_KEY = ( getenv("PHOTOSAPI_SECRET") if getenv("PHOTOSAPI_SECRET") is not None else configGet("secret") ) ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 180 class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): user: Union[str, None] = None scopes: List[str] = [] class User(BaseModel): user: str email: Union[str, None] = None quota: Union[int, None] = None disabled: Union[bool, None] = None class UserInDB(User): hash: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={ "me": "Get current user's data.", "albums.list": "List albums.", "albums.read": "Read albums data.", "albums.write": "Modify albums.", "photos.list": "List photos.", "photos.read": "View photos.", "photos.write": "Modify photos.", "videos.list": "List videos.", "videos.read": "View videos.", "videos.write": "Modify videos.", }, ) def verify_password(plain_password, hashed_password) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password) -> str: return pwd_context.hash(password) async def get_user(user: str) -> UserInDB: found_user = await col_users.find_one({"user": user}) if found_user is None: raise RuntimeError(f"User {user} does not exist") return UserInDB( user=found_user["user"], email=found_user["email"], quota=found_user["quota"] if found_user["quota"] is not None else configGet("default_user_quota"), disabled=found_user["disabled"], hash=found_user["hash"], ) async def authenticate_user(user_name: str, password: str) -> Union[UserInDB, bool]: if user := await get_user(user_name): return user if verify_password(password, user.hash) else False else: return False def create_access_token( data: dict, expires_delta: Union[timedelta, None] = None ) -> str: to_encode = data.copy() if expires_delta: expire = datetime.now(tz=timezone.utc) + expires_delta else: expire = datetime.now(tz=timezone.utc) + timedelta( days=ACCESS_TOKEN_EXPIRE_DAYS ) to_encode["exp"] = expire return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ) -> UserInDB: if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user: str = payload.get("sub") if user is None: raise credentials_exception token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError) as exc: raise credentials_exception from exc user_record = await get_user(user=token_data.user) if user_record is None: raise credentials_exception for scope in security_scopes.scopes: if scope not in token_data.scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user_record async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user