OAuth2 implemented

This commit is contained in:
Profitroll 2022-12-20 13:28:50 +01:00
parent 4e39d7d4ac
commit 8ee3687c73
4 changed files with 200 additions and 246 deletions

View File

@ -1,23 +1,19 @@
import re import re
from os import makedirs, rename, sep from os import makedirs, path, rename
from shutil import rmtree from shutil import rmtree
from typing import Union from typing import Union
from modules.utils import configGet from modules.app import app
from modules.app import app, check_project_key, get_api_key
from modules.database import col_photos, col_albums from modules.database import col_photos, col_albums
from modules.security import User, get_current_active_user from modules.security import User, get_current_active_user
from bson.objectid import ObjectId from bson.objectid import ObjectId
from bson.errors import InvalidId from bson.errors import InvalidId
from fastapi import HTTPException, Depends, Security from fastapi import HTTPException, Security
from fastapi.responses import UJSONResponse, Response from fastapi.responses import UJSONResponse, Response
from fastapi.openapi.models import APIKey from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
from starlette.status import HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.post("/albums", response_class=UJSONResponse, description="Create album with name and title") @app.post("/albums", response_class=UJSONResponse, description="Create album with name and title")
async def album_create(name: str, title: str, apikey: APIKey = Depends(get_api_key)): async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if (check_project_key("photos", apikey)):
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.") return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
@ -31,9 +27,9 @@ async def album_create(name: str, title: str, apikey: APIKey = Depends(get_api_k
if col_albums.find_one( {"name": name} ) is not None: if col_albums.find_one( {"name": name} ) is not None:
return HTTPException(status_code=HTTP_409_CONFLICT, detail=f"Album with name '{name}' already exists.") return HTTPException(status_code=HTTP_409_CONFLICT, detail=f"Album with name '{name}' already exists.")
makedirs(f'{configGet("data_location")}{sep}photos{sep}images{sep}{name}', exist_ok=True) makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True)
uploaded = col_albums.insert_one( {"name": name, "title": title} ) uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title} )
return UJSONResponse( return UJSONResponse(
{ {
@ -43,11 +39,8 @@ async def album_create(name: str, title: str, apikey: APIKey = Depends(get_api_k
} }
) )
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/albums", description="Find album by name") @app.get("/albums", description="Find album by name")
async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["list"])): async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])):
output = {"results": []} output = {"results": []}
albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} )) albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} ))
@ -58,9 +51,7 @@ async def album_find(q: str, current_user: User = Security(get_current_active_us
return UJSONResponse(output) return UJSONResponse(output)
@app.patch("/albums/{id}", response_class=UJSONResponse, description="Modify album's name or title by id") @app.patch("/albums/{id}", response_class=UJSONResponse, description="Modify album's name or title by id")
async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)): async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if (check_project_key("photos", apikey)):
try: try:
album = col_albums.find_one( {"_id": ObjectId(id)} ) album = col_albums.find_one( {"_id": ObjectId(id)} )
@ -80,8 +71,11 @@ async def album_patch(id: str, name: Union[str, None] = None, title: Union[str,
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.") return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
if 2 > len(name) > 20: if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.") return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
rename(f'{configGet("data_location")}{sep}photos{sep}images{sep}{album["name"]}', f'{configGet("data_location")}{sep}photos{sep}images{sep}{name}') rename(
col_photos.update_many( {"album": album["name"]}, {"$set": {"album": name}} ) path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
else: else:
name = album["name"] name = album["name"]
@ -94,13 +88,8 @@ async def album_patch(id: str, name: Union[str, None] = None, title: Union[str,
} }
) )
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.put("/albums/{id}", response_class=UJSONResponse, description="Modify album's name and title by id") @app.put("/albums/{id}", response_class=UJSONResponse, description="Modify album's name and title by id")
async def album_put(id: str, name: str, title: str, apikey: APIKey = Depends(get_api_key)): async def album_put(id: str, name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if (check_project_key("photos", apikey)):
try: try:
album = col_albums.find_one( {"_id": ObjectId(id)} ) album = col_albums.find_one( {"_id": ObjectId(id)} )
@ -118,9 +107,12 @@ async def album_put(id: str, name: str, title: str, apikey: APIKey = Depends(get
if 2 > len(title) > 40: if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.") return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
rename(f'{configGet("data_location")}{sep}photos{sep}images{sep}{album["name"]}', f'{configGet("data_location")}{sep}photos{sep}images{sep}{name}') rename(
col_photos.update_many( {"album": album["name"]}, {"$set": {"album": name}} ) path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title}} ) col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title}} )
return UJSONResponse( return UJSONResponse(
@ -130,13 +122,8 @@ async def album_put(id: str, name: str, title: str, apikey: APIKey = Depends(get
} }
) )
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.delete("/album/{id}", response_class=UJSONResponse, description="Delete album by id") @app.delete("/album/{id}", response_class=UJSONResponse, description="Delete album by id")
async def album_delete(id: str, apikey: APIKey = Depends(get_api_key)): async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if (check_project_key("photos", apikey)):
try: try:
album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} ) album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} )
@ -147,9 +134,6 @@ async def album_delete(id: str, apikey: APIKey = Depends(get_api_key)):
col_photos.delete_many( {"album": album["name"]} ) col_photos.delete_many( {"album": album["name"]} )
rmtree(f'{configGet("data_location")}{sep}photos{sep}images{sep}{album["name"]}') rmtree(path.join("data", "users", current_user.user, "albums", album["name"]))
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))

View File

@ -1,44 +1,42 @@
import re import re
import pickle
from secrets import token_urlsafe from secrets import token_urlsafe
from magic import Magic from magic import Magic
from datetime import datetime from datetime import datetime
from os import makedirs, sep, path, remove from os import makedirs, path, remove
from modules.hasher import get_phash, get_duplicates from modules.hasher import get_phash, get_duplicates
from modules.utils import configGet from modules.security import User, get_current_active_user
from modules.app import app, check_project_key, get_api_key from modules.app import app
from modules.database import col_photos, col_albums, col_tokens from modules.database import col_photos, col_albums, col_tokens
from bson.objectid import ObjectId from bson.objectid import ObjectId
from bson.errors import InvalidId from bson.errors import InvalidId
from fastapi import HTTPException, Depends, UploadFile from fastapi import HTTPException, UploadFile, Security
from fastapi.responses import UJSONResponse, Response from fastapi.responses import UJSONResponse, Response
from fastapi.openapi.models import APIKey from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_409_CONFLICT
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.post("/albums/{album}/photos", response_class=UJSONResponse, description="Upload a photo to album") @app.post("/albums/{album}/photos", response_class=UJSONResponse, description="Upload a photo to album")
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, apikey: APIKey = Depends(get_api_key)): async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if (check_project_key("photos", apikey)): if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
if col_albums.find_one( {"name": album} ) is None:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
# if not file.content_type.startswith("image"): # if not file.content_type.startswith("image"):
# return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.") # return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.")
makedirs(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}', exist_ok=True) makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True)
filename = file.filename filename = file.filename
if path.exists(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}{sep}{file.filename}'): if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)):
base_name = file.filename.split(".")[:-1] base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1] extension = file.filename.split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
with open(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}{sep}{filename}', "wb") as f: with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f:
f.write(await file.read()) f.write(await file.read())
file_hash = await get_phash(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}{sep}{filename}') file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename))
duplicates = await get_duplicates(file_hash, album) duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False: if len(duplicates) > 0 and ignore_duplicates is False:
@ -50,7 +48,7 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F
status_code=HTTP_409_CONFLICT status_code=HTTP_409_CONFLICT
) )
uploaded = col_photos.insert_one( {"album": album, "hash": file_hash, "filename": filename} ) uploaded = col_photos.insert_one( {"user": current_user.user, "album": album, "hash": file_hash, "filename": filename} )
return UJSONResponse( return UJSONResponse(
{ {
@ -61,13 +59,8 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F
} }
) )
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/photos/{id}", description="Get a photo by id") @app.get("/photos/{id}", description="Get a photo by id")
async def photo_get(id: str, apikey: APIKey = Depends(get_api_key)): async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.view"])):
if (check_project_key("photos", apikey)):
try: try:
image = col_photos.find_one( {"_id": ObjectId(id)} ) image = col_photos.find_one( {"_id": ObjectId(id)} )
@ -76,7 +69,7 @@ async def photo_get(id: str, apikey: APIKey = Depends(get_api_key)):
except InvalidId: except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.")
image_path = f'data{sep}users{sep}sample_user{sep}albums{sep}{image["album"]}{sep}{image["filename"]}' image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])
mime = Magic(mime=True).from_file(image_path) mime = Magic(mime=True).from_file(image_path)
@ -84,13 +77,8 @@ async def photo_get(id: str, apikey: APIKey = Depends(get_api_key)):
return Response(image_file, media_type=mime) return Response(image_file, media_type=mime)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.delete("/photos/{id}", description="Delete a photo by id") @app.delete("/photos/{id}", description="Delete a photo by id")
async def photo_delete(id: str, apikey: APIKey = Depends(get_api_key)): async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if (check_project_key("photos", apikey)):
try: try:
image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} ) image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} )
@ -99,19 +87,14 @@ async def photo_delete(id: str, apikey: APIKey = Depends(get_api_key)):
except InvalidId: except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.")
remove(f'data{sep}users{sep}sample_user{sep}albums{sep}{image["album"]}{sep}{image["filename"]}') remove(path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]))
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/albums/{album}/photos", response_class=UJSONResponse, description="Find a photo by filename") @app.get("/albums/{album}/photos", response_class=UJSONResponse, description="Find a photo by filename")
async def photo_find(q: str, album: str, page: int = 1, page_size: int = 100, apikey: APIKey = Depends(get_api_key)): async def photo_find(q: str, album: str, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
if (check_project_key("photos", apikey)): if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
if col_albums.find_one( {"name": album} ) is None:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
if page <= 0 or page_size <= 0: if page <= 0 or page_size <= 0:
@ -119,22 +102,22 @@ async def photo_find(q: str, album: str, page: int = 1, page_size: int = 100, ap
output = {"results": []} output = {"results": []}
skip = (page-1)*page_size skip = (page-1)*page_size
images = list(col_photos.find({"album": album, "filename": re.compile(q)}, limit=page_size, skip=skip)) images = list(col_photos.find({"user": current_user.user, "album": album, "filename": re.compile(q)}, limit=page_size, skip=skip))
for image in images: for image in images:
output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"]}) output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"]})
if col_photos.count_documents( {"album": album, "filename": re.compile(q)} ) > page*page_size: if col_photos.count_documents( {"user": current_user.user, "album": album, "filename": re.compile(q)} ) > page*page_size:
token = str(token_urlsafe(32)) token = str(token_urlsafe(32))
col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "apikey": apikey} ) col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} )
output["next_page"] = f"https://api.end-play.xyz/photoFindToken?token={token}" # type: ignore output["next_page"] = f"/albums/{album}/photos/token?token={token}" # type: ignore
with open("something.txt", "w", encoding="utf-8") as f:
f.write(pickle.loads(pickle.dumps(current_user)).user)
return UJSONResponse(output) return UJSONResponse(output)
else: @app.get("/albums/{album}/photos/token", response_class=UJSONResponse, description="Find a photo by token")
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/photos/token/{token}", response_class=UJSONResponse, description="Find a photo by token")
async def photo_find_token(token: str): async def photo_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} ) found_record = col_tokens.find_one( {"token": token} )
@ -142,4 +125,4 @@ async def photo_find_token(token: str):
if found_record is None: if found_record is None:
return HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid search token.") return HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid search token.")
return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], apikey=found_record["apikey"]) return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))

View File

@ -2,7 +2,7 @@ from datetime import timedelta
from modules.database import col_users from modules.database import col_users
from modules.app import app from modules.app import app
from fastapi import Depends, HTTPException, Security, Response from fastapi import Depends, HTTPException, Response
from starlette.status import HTTP_204_NO_CONTENT from starlette.status import HTTP_204_NO_CONTENT
from fastapi.security import ( from fastapi.security import (
OAuth2PasswordRequestForm, OAuth2PasswordRequestForm,
@ -15,7 +15,6 @@ from modules.security import (
authenticate_user, authenticate_user,
create_access_token, create_access_token,
get_current_active_user, get_current_active_user,
get_current_user,
get_password_hash get_password_hash
) )
@ -44,13 +43,13 @@ async def create_users(user: str, email: str, password: str):
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
@app.get("/users/me/items/") # @app.get("/users/me/items/")
async def read_own_items( # async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"]) # current_user: User = Security(get_current_active_user, scopes=["items"])
): # ):
return [{"item_id": "Foo", "owner": current_user.user}] # return [{"item_id": "Foo", "owner": current_user.user}]
@app.get("/status/") # @app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)): # async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"} # return {"status": "ok"}

View File

@ -1,10 +1,8 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from typing import List, Union from typing import List, Union
from modules.database import col_users from modules.database import col_users
from modules.app import app
from fastapi import Depends, HTTPException, Security, status from fastapi import Depends, HTTPException, Security, status
from starlette.status import HTTP_204_NO_CONTENT
from fastapi.security import ( from fastapi.security import (
OAuth2PasswordBearer, OAuth2PasswordBearer,
SecurityScopes, SecurityScopes,
@ -20,22 +18,6 @@ ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_DAYS = 180 ACCESS_TOKEN_EXPIRE_DAYS = 180
fake_users_db = {
"johndoe": {
"user": "johndoe",
"email": "johndoe@example.com",
"hash": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"user": "alice",
"email": "alicechains@example.com",
"hash": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel): class Token(BaseModel):
access_token: str access_token: str
token_type: str token_type: str
@ -62,9 +44,13 @@ oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token", tokenUrl="token",
scopes={ scopes={
"me": "Get current user's data.", "me": "Get current user's data.",
"list": "List albums and images.", "albums.list": "List albums.",
"read": "View albums and images.", "albums.read": "Read albums data.",
"write": "Manage albums and images."}, "albums.write": "Modify albums.",
"photos.list": "List photos.",
"photos.read": "View photos.",
"photos.write": "Modify photos."
},
) )
@ -101,18 +87,19 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
return encoded_jwt return encoded_jwt
async def get_current_user( async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ):
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes: if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else: else:
authenticate_value = "Bearer" authenticate_value = "Bearer"
credentials_exception = HTTPException( credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials", detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value}, headers={"WWW-Authenticate": authenticate_value},
) )
try: try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user: str = payload.get("sub") user: str = payload.get("sub")
@ -122,9 +109,12 @@ async def get_current_user(
token_data = TokenData(scopes=token_scopes, user=user) token_data = TokenData(scopes=token_scopes, user=user)
except (JWTError, ValidationError): except (JWTError, ValidationError):
raise credentials_exception raise credentials_exception
user = get_user(user=token_data.user) user = get_user(user=token_data.user)
if user is None: if user is None:
raise credentials_exception raise credentials_exception
for scope in security_scopes.scopes: for scope in security_scopes.scopes:
if scope not in token_data.scopes: if scope not in token_data.scopes:
raise HTTPException( raise HTTPException(
@ -135,9 +125,7 @@ async def get_current_user(
return user return user
async def get_current_active_user( async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ):
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled: if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=400, detail="Inactive user")
return current_user return current_user