from datetime import datetime, timedelta from typing import List, Union from modules.database import col_users from fastapi import Depends, HTTPException, Security, status from fastapi.security import ( OAuth2PasswordBearer, SecurityScopes, ) from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel, ValidationError with open("secret_key", "r", encoding="utf-8") as f: SECRET_KEY = f.read() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 180 class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): user: Union[str, None] = None scopes: List[str] = [] class User(BaseModel): user: str email: Union[str, None] = None disabled: Union[bool, None] = None class UserInDB(User): hash: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={ "me": "Get current user's data.", "albums.list": "List albums.", "albums.read": "Read albums data.", "albums.write": "Modify albums.", "photos.list": "List photos.", "photos.read": "View photos.", "photos.write": "Modify photos." }, ) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(user: str): found_user = col_users.find_one( {"user": user} ) return UserInDB(user=found_user["user"], email=found_user["email"], disabled=found_user["disabled"], hash=found_user["hash"]) def authenticate_user(user_name: str, password: str): user = get_user(user_name) if not user: return False if not verify_password(password, user.hash): return False return user def create_access_token( data: dict, expires_delta: Union[timedelta, None] = None ): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user: str = payload.get("sub") if user is None: raise credentials_exception token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError): raise credentials_exception user = get_user(user=token_data.user) if user is None: raise credentials_exception for scope in security_scopes.scopes: if scope not in token_data.scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user