Compare commits

..

No commits in common. "8ee3687c73d1dd119962152397d6ac5500091529" and "9227ee971a9bf2e380d787cf8cb1f5f4a8d93ec6" have entirely different histories.

5 changed files with 238 additions and 193 deletions

3
.gitignore vendored

@ -154,5 +154,4 @@ cython_debug/
# Custom
.vscode
config.json
pages
config.json

@ -1,46 +1,53 @@
import re
from os import makedirs, path, rename
from os import makedirs, rename, sep
from shutil import rmtree
from typing import Union
from modules.app import app
from modules.utils import configGet
from modules.app import app, check_project_key, get_api_key
from modules.database import col_photos, col_albums
from modules.security import User, get_current_active_user
from bson.objectid import ObjectId
from bson.errors import InvalidId
from fastapi import HTTPException, Security
from fastapi import HTTPException, Depends, Security
from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
from fastapi.openapi.models import APIKey
from starlette.status import HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.post("/albums", response_class=UJSONResponse, description="Create album with name and title")
async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
async def album_create(name: str, title: str, apikey: APIKey = Depends(get_api_key)):
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
if (check_project_key("photos", apikey)):
if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
if col_albums.find_one( {"name": name} ) is not None:
return HTTPException(status_code=HTTP_409_CONFLICT, detail=f"Album with name '{name}' already exists.")
if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True)
if col_albums.find_one( {"name": name} ) is not None:
return HTTPException(status_code=HTTP_409_CONFLICT, detail=f"Album with name '{name}' already exists.")
uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title} )
makedirs(f'{configGet("data_location")}{sep}photos{sep}images{sep}{name}', exist_ok=True)
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"name": name,
"title": title
}
)
uploaded = col_albums.insert_one( {"name": name, "title": title} )
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"name": name,
"title": title
}
)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/albums", description="Find album by name")
async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])):
async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["list"])):
output = {"results": []}
albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} ))
@ -51,89 +58,98 @@ async def album_find(q: str, current_user: User = Security(get_current_active_us
return UJSONResponse(output)
@app.patch("/albums/{id}", response_class=UJSONResponse, description="Modify album's name or title by id")
async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)):
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
if (check_project_key("photos", apikey)):
if title is not None:
if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
else:
title = album["title"]
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
if name is not None:
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
if title is not None:
if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
else:
title = album["title"]
if name is not None:
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
rename(f'{configGet("data_location")}{sep}photos{sep}images{sep}{album["name"]}', f'{configGet("data_location")}{sep}photos{sep}images{sep}{name}')
col_photos.update_many( {"album": album["name"]}, {"$set": {"album": name}} )
else:
name = album["name"]
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title}} )
return UJSONResponse(
{
"name": name,
"title": title
}
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
else:
name = album["name"]
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title}} )
return UJSONResponse(
{
"name": name,
"title": title
}
)
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.put("/albums/{id}", response_class=UJSONResponse, description="Modify album's name and title by id")
async def album_put(id: str, name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
async def album_put(id: str, name: str, title: str, apikey: APIKey = Depends(get_api_key)):
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
if (check_project_key("photos", apikey)):
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
if 2 > len(name) > 20:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
)
if 2 > len(title) > 40:
return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title}} )
rename(f'{configGet("data_location")}{sep}photos{sep}images{sep}{album["name"]}', f'{configGet("data_location")}{sep}photos{sep}images{sep}{name}')
col_photos.update_many( {"album": album["name"]}, {"$set": {"album": name}} )
return UJSONResponse(
{
"name": name,
"title": title
}
)
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title}} )
return UJSONResponse(
{
"name": name,
"title": title
}
)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.delete("/album/{id}", response_class=UJSONResponse, description="Delete album by id")
async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
async def album_delete(id: str, apikey: APIKey = Depends(get_api_key)):
try:
album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
col_photos.delete_many( {"album": album["name"]} )
if (check_project_key("photos", apikey)):
rmtree(path.join("data", "users", current_user.user, "albums", album["name"]))
try:
album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
col_photos.delete_many( {"album": album["name"]} )
return Response(status_code=HTTP_204_NO_CONTENT)
rmtree(f'{configGet("data_location")}{sep}photos{sep}images{sep}{album["name"]}')
return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))

@ -1,123 +1,140 @@
import re
import pickle
from secrets import token_urlsafe
from magic import Magic
from datetime import datetime
from os import makedirs, path, remove
from os import makedirs, sep, path, remove
from modules.hasher import get_phash, get_duplicates
from modules.security import User, get_current_active_user
from modules.app import app
from modules.utils import configGet
from modules.app import app, check_project_key, get_api_key
from modules.database import col_photos, col_albums, col_tokens
from bson.objectid import ObjectId
from bson.errors import InvalidId
from fastapi import HTTPException, UploadFile, Security
from fastapi import HTTPException, Depends, UploadFile
from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_409_CONFLICT
from fastapi.openapi.models import APIKey
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.post("/albums/{album}/photos", response_class=UJSONResponse, description="Upload a photo to album")
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, apikey: APIKey = Depends(get_api_key)):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
if (check_project_key("photos", apikey)):
# if not file.content_type.startswith("image"):
# return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.")
if col_albums.find_one( {"name": album} ) is None:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True)
# if not file.content_type.startswith("image"):
# return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.")
filename = file.filename
makedirs(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}', exist_ok=True)
if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)):
base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
filename = file.filename
with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f:
f.write(await file.read())
if path.exists(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}{sep}{file.filename}'):
base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename))
duplicates = await get_duplicates(file_hash, album)
with open(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}{sep}{filename}', "wb") as f:
f.write(await file.read())
file_hash = await get_phash(f'data{sep}users{sep}sample_user{sep}albums{sep}{album}{sep}{filename}')
duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False:
return UJSONResponse(
{
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": duplicates
},
status_code=HTTP_409_CONFLICT
)
uploaded = col_photos.insert_one( {"album": album, "hash": file_hash, "filename": filename} )
if len(duplicates) > 0 and ignore_duplicates is False:
return UJSONResponse(
{
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": duplicates
},
status_code=HTTP_409_CONFLICT
"id": uploaded.inserted_id.__str__(),
"album": album,
"hash": file_hash,
"filename": filename
}
)
uploaded = col_photos.insert_one( {"user": current_user.user, "album": album, "hash": file_hash, "filename": filename} )
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"album": album,
"hash": file_hash,
"filename": filename
}
)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/photos/{id}", description="Get a photo by id")
async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.view"])):
async def photo_get(id: str, apikey: APIKey = Depends(get_api_key)):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.")
if (check_project_key("photos", apikey)):
image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.")
mime = Magic(mime=True).from_file(image_path)
image_path = f'data{sep}users{sep}sample_user{sep}albums{sep}{image["album"]}{sep}{image["filename"]}'
with open(image_path, "rb") as f: image_file = f.read()
mime = Magic(mime=True).from_file(image_path)
return Response(image_file, media_type=mime)
with open(image_path, "rb") as f: image_file = f.read()
return Response(image_file, media_type=mime)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.delete("/photos/{id}", description="Delete a photo by id")
async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
async def photo_delete(id: str, apikey: APIKey = Depends(get_api_key)):
try:
image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.")
if (check_project_key("photos", apikey)):
remove(path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]))
try:
image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.")
return Response(status_code=HTTP_204_NO_CONTENT)
remove(f'data{sep}users{sep}sample_user{sep}albums{sep}{image["album"]}{sep}{image["filename"]}')
return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/albums/{album}/photos", response_class=UJSONResponse, description="Find a photo by filename")
async def photo_find(q: str, album: str, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
async def photo_find(q: str, album: str, page: int = 1, page_size: int = 100, apikey: APIKey = Depends(get_api_key)):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
if (check_project_key("photos", apikey)):
if page <= 0 or page_size <= 0:
return HTTPException(status_code=HTTP_400_BAD_REQUEST, detail="Parameters 'page' and 'page_size' must be greater or equal to 1.")
if col_albums.find_one( {"name": album} ) is None:
return HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
output = {"results": []}
skip = (page-1)*page_size
images = list(col_photos.find({"user": current_user.user, "album": album, "filename": re.compile(q)}, limit=page_size, skip=skip))
if page <= 0 or page_size <= 0:
return HTTPException(status_code=HTTP_400_BAD_REQUEST, detail="Parameters 'page' and 'page_size' must be greater or equal to 1.")
for image in images:
output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"]})
output = {"results": []}
skip = (page-1)*page_size
images = list(col_photos.find({"album": album, "filename": re.compile(q)}, limit=page_size, skip=skip))
if col_photos.count_documents( {"user": current_user.user, "album": album, "filename": re.compile(q)} ) > page*page_size:
token = str(token_urlsafe(32))
col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} )
output["next_page"] = f"/albums/{album}/photos/token?token={token}" # type: ignore
for image in images:
output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"]})
with open("something.txt", "w", encoding="utf-8") as f:
f.write(pickle.loads(pickle.dumps(current_user)).user)
if col_photos.count_documents( {"album": album, "filename": re.compile(q)} ) > page*page_size:
token = str(token_urlsafe(32))
col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "apikey": apikey} )
output["next_page"] = f"https://api.end-play.xyz/photoFindToken?token={token}" # type: ignore
return UJSONResponse(output)
return UJSONResponse(output)
@app.get("/albums/{album}/photos/token", response_class=UJSONResponse, description="Find a photo by token")
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/photos/token/{token}", response_class=UJSONResponse, description="Find a photo by token")
async def photo_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} )
@ -125,4 +142,4 @@ async def photo_find_token(token: str):
if found_record is None:
return HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid search token.")
return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))
return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], apikey=found_record["apikey"])

@ -2,7 +2,7 @@ from datetime import timedelta
from modules.database import col_users
from modules.app import app
from fastapi import Depends, HTTPException, Response
from fastapi import Depends, HTTPException, Security, Response
from starlette.status import HTTP_204_NO_CONTENT
from fastapi.security import (
OAuth2PasswordRequestForm,
@ -15,6 +15,7 @@ from modules.security import (
authenticate_user,
create_access_token,
get_current_active_user,
get_current_user,
get_password_hash
)
@ -43,13 +44,13 @@ async def create_users(user: str, email: str, password: str):
return Response(status_code=HTTP_204_NO_CONTENT)
# @app.get("/users/me/items/")
# async def read_own_items(
# current_user: User = Security(get_current_active_user, scopes=["items"])
# ):
# return [{"item_id": "Foo", "owner": current_user.user}]
@app.get("/users/me/items/")
async def read_own_items(
current_user: User = Security(get_current_active_user, scopes=["items"])
):
return [{"item_id": "Foo", "owner": current_user.user}]
# @app.get("/status/")
# async def read_system_status(current_user: User = Depends(get_current_user)):
# return {"status": "ok"}
@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}

@ -1,8 +1,10 @@
from datetime import datetime, timedelta
from typing import List, Union
from modules.database import col_users
from modules.app import app
from fastapi import Depends, HTTPException, Security, status
from starlette.status import HTTP_204_NO_CONTENT
from fastapi.security import (
OAuth2PasswordBearer,
SecurityScopes,
@ -18,6 +20,22 @@ ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_DAYS = 180
fake_users_db = {
"johndoe": {
"user": "johndoe",
"email": "johndoe@example.com",
"hash": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"user": "alice",
"email": "alicechains@example.com",
"hash": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}
class Token(BaseModel):
access_token: str
token_type: str
@ -44,13 +62,9 @@ oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={
"me": "Get current user's data.",
"albums.list": "List albums.",
"albums.read": "Read albums data.",
"albums.write": "Modify albums.",
"photos.list": "List photos.",
"photos.read": "View photos.",
"photos.write": "Modify photos."
},
"list": "List albums and images.",
"read": "View albums and images.",
"write": "Manage albums and images."},
)
@ -76,7 +90,7 @@ def authenticate_user(user_name: str, password: str):
return user
def create_access_token( data: dict, expires_delta: Union[timedelta, None] = None ):
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
@ -87,19 +101,18 @@ def create_access_token( data: dict, expires_delta: Union[timedelta, None] = Non
return encoded_jwt
async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ):
async def get_current_user(
security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)
):
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user: str = payload.get("sub")
@ -109,12 +122,9 @@ async def get_current_user( security_scopes: SecurityScopes, token: str = Depend
token_data = TokenData(scopes=token_scopes, user=user)
except (JWTError, ValidationError):
raise credentials_exception
user = get_user(user=token_data.user)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
@ -125,7 +135,9 @@ async def get_current_user( security_scopes: SecurityScopes, token: str = Depend
return user
async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ):
async def get_current_active_user(
current_user: User = Security(get_current_user, scopes=["me"])
):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user