from datetime import datetime, timedelta, timezone from typing import List, Union from fastapi import Depends, HTTPException, Security, status from fastapi.security import OAuth2PasswordBearer, SecurityScopes from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel, ValidationError from modules.database import col_users with open("secret_key", "r", encoding="utf-8") as f: SECRET_KEY = f.read() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 180 class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): user: Union[str, None] = None scopes: List[str] = [] class User(BaseModel): user: str email: Union[str, None] = None disabled: Union[bool, None] = None class UserInDB(User): hash: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={ "me": "Get current user's data.", "albums.list": "List albums.", "albums.read": "Read albums data.", "albums.write": "Modify albums.", "photos.list": "List photos.", "photos.read": "View photos.", "photos.write": "Modify photos.", "videos.list": "List videos.", "videos.read": "View videos.", "videos.write": "Modify videos.", }, ) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(user: str): found_user = col_users.find_one({"user": user}) return UserInDB( user=found_user["user"], email=found_user["email"], disabled=found_user["disabled"], hash=found_user["hash"], ) def authenticate_user(user_name: str, password: str): if user := get_user(user_name): return user if verify_password(password, user.hash) else False else: return False def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.now(tz=timezone.utc) + expires_delta else: expire = datetime.now(tz=timezone.utc) + timedelta( days=ACCESS_TOKEN_EXPIRE_DAYS ) to_encode["exp"] = expire return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user: str = payload.get("sub") if user is None: raise credentials_exception token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError): raise credentials_exception user = get_user(user=token_data.user) if user is None: raise credentials_exception for scope in security_scopes.scopes: if scope not in token_data.scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user