from datetime import datetime, timedelta from typing import List, Union from modules.database import col_users from modules.app import app from fastapi import Depends, HTTPException, Security, status from starlette.status import HTTP_204_NO_CONTENT from fastapi.security import ( OAuth2PasswordBearer, SecurityScopes, ) from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel, ValidationError with open("secret_key", "r", encoding="utf-8") as f: SECRET_KEY = f.read() ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_DAYS = 180 fake_users_db = { "johndoe": { "user": "johndoe", "email": "johndoe@example.com", "hash": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, }, "alice": { "user": "alice", "email": "alicechains@example.com", "hash": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm", "disabled": True, }, } class Token(BaseModel): access_token: str token_type: str class TokenData(BaseModel): user: Union[str, None] = None scopes: List[str] = [] class User(BaseModel): user: str email: Union[str, None] = None disabled: Union[bool, None] = None class UserInDB(User): hash: str pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer( tokenUrl="token", scopes={ "me": "Get current user's data.", "list": "List albums and images.", "read": "View albums and images.", "write": "Manage albums and images."}, ) def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(user: str): found_user = col_users.find_one( {"user": user} ) return UserInDB(user=found_user["user"], email=found_user["email"], disabled=found_user["disabled"], hash=found_user["hash"]) def authenticate_user(user_name: str, password: str): user = get_user(user_name) if not user: return False if not verify_password(password, user.hash): return False return user def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: authenticate_value = "Bearer" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": authenticate_value}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user: str = payload.get("sub") if user is None: raise credentials_exception token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError): raise credentials_exception user = get_user(user=token_data.user) if user is None: raise credentials_exception for scope in security_scopes.scopes: if scope not in token_data.scopes: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Not enough permissions", headers={"WWW-Authenticate": authenticate_value}, ) return user async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") return current_user