WIP: Custom exceptions handling and new models

This commit is contained in:
Profitroll 2023-02-16 14:11:29 +01:00
parent dddb5dbc12
commit 873e506c7d
7 changed files with 147 additions and 37 deletions

46
classes/exceptions.py Normal file
View File

@ -0,0 +1,46 @@
from typing import Literal
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class AlbumNotFoundError(Exception):
def __init__(self, id: str):
self.id = id
self.openapi = {
"description": "Album does not exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find an album with id '{id}'."
}
}
}
}
class AlbumAlreadyExistsError(Exception):
def __init__(self, name: str):
self.name = name
self.openapi = {
"description": "Album already exists",
"content": {
"application/json": {
"example": {
"detail": "Album with name '{name}' already exists."
}
}
}
}
class AlbumIncorrectError(Exception):
def __init__(self, place: Literal["name", "title"], error: str) -> None:
self.place = place
self.error = error
self.openapi = {
"description": "Album name/title invalid",
"content": {
"application/json": {
"example": {
"detail": "Album {name/title} invalid: {error}"
}
}
}
}

View File

@ -1,4 +1,4 @@
from typing import Union
from typing import Dict, List, Literal, Union
from pydantic import BaseModel
class Photo(BaseModel):
@ -7,21 +7,45 @@ class Photo(BaseModel):
hash: str
filename: str
class PhotoSearch(BaseModel):
id: str
filename: str
caption: Union[str, None]
class Video(BaseModel):
id: str
album: str
hash: str
filename: str
class VideoSearch(BaseModel):
id: str
filename: str
caption: Union[str, None]
class Album(BaseModel):
id: str
name: str
title: str
class AlbumSearch(BaseModel):
id: str
name: str
title: str
class AlbumModified(BaseModel):
name: str
title: str
cover: Union[str, None]
class SearchResults(BaseModel):
results: list
next_page: Union[str, None] = None
class SearchResultsAlbum(BaseModel):
results: List[Album]
next_page: Union[str, None]
class SearchResultsPhoto(BaseModel):
results: List[PhotoSearch]
next_page: Union[str, None]
class SearchResultsVideo(BaseModel):
results: List[VideoSearch]
next_page: Union[str, None]

View File

@ -2,7 +2,8 @@ import re
from os import makedirs, path, rename
from shutil import rmtree
from typing import Union
from classes.models import Album, AlbumModified, SearchResults
from classes.exceptions import AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError
from classes.models import Album, AlbumModified, SearchResultsAlbum
from modules.app import app
from modules.database import col_photos, col_albums
from modules.security import User, get_current_active_user
@ -13,20 +14,24 @@ from fastapi import HTTPException, Security
from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.post("/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album)
album_create_responses = {
406: AlbumIncorrectError("name", "error").openapi,
409: AlbumAlreadyExistsError("name").openapi
}
@app.post("/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses)
async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if col_albums.find_one( {"name": name} ) is not None:
raise HTTPException(status_code=HTTP_409_CONFLICT, detail=f"Album with name '{name}' already exists.")
raise AlbumAlreadyExistsError(name)
makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True)
@ -40,18 +45,22 @@ async def album_create(name: str, title: str, current_user: User = Security(get_
}
)
@app.get("/albums", description="Find album by name", response_model=SearchResults)
@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum)
async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])):
output = {"results": []}
albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} ))
for album in albums:
output["results"].append( {"id": album["_id"].__str__(), "name": album["name"]} )
output["results"].append( {"id": album["_id"].__str__(), "name": album["name"], "title": album["title"]} )
return UJSONResponse(output)
@app.patch("/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified)
album_patch_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi
}
@app.patch("/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses)
async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
try:
@ -59,19 +68,19 @@ async def album_patch(id: str, name: Union[str, None] = None, title: Union[str,
if album is None:
raise InvalidId(id)
except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
raise AlbumNotFoundError(id)
if title is not None:
if 2 > len(title) > 40:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
else:
title = album["title"]
if name is not None:
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
@ -96,7 +105,11 @@ async def album_patch(id: str, name: Union[str, None] = None, title: Union[str,
}
)
@app.put("/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified)
album_put_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi
}
@app.put("/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses)
async def album_put(id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
try:
@ -104,16 +117,16 @@ async def album_put(id: str, name: str, title: str, cover: str, current_user: Us
if album is None:
raise InvalidId(id)
except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
raise AlbumNotFoundError(id)
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.")
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.")
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.")
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} )
cover = image["_id"].__str__() if image is not None else None # type: ignore
@ -134,7 +147,10 @@ async def album_put(id: str, name: str, title: str, cover: str, current_user: Us
}
)
@app.delete("/album/{id}", description="Delete album by id", response_class=UJSONResponse)
album_delete_responses = {
404: AlbumNotFoundError("id").openapi
}
@app.delete("/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses)
async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
try:
@ -142,7 +158,7 @@ async def album_delete(id: str, current_user: User = Security(get_current_active
if album is None:
raise InvalidId(id)
except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.")
raise AlbumNotFoundError(id)
col_photos.delete_many( {"album": album["name"]} )

24
extensions/exceptions.py Normal file
View File

@ -0,0 +1,24 @@
from modules.app import app
from classes.exceptions import *
from starlette.status import HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.exception_handler(AlbumNotFoundError)
async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError):
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={"detail": f"Could not find an album with id '{exc.id}'."},
)
@app.exception_handler(AlbumAlreadyExistsError)
async def album_already_exists_exception_handler(request: Request, exc: AlbumAlreadyExistsError):
return JSONResponse(
status_code=HTTP_409_CONFLICT,
content={"detail": f"Album with name '{exc.name}' already exists."},
)
@app.exception_handler(AlbumIncorrectError)
async def album_incorrect_exception_handler(request: Request, exc: AlbumIncorrectError):
return JSONResponse(
status_code=HTTP_406_NOT_ACCEPTABLE,
content={"detail": f"Album {exc.place} invalid: {exc.error}"},
)

View File

@ -7,7 +7,7 @@ from typing import List, Union
from magic import Magic
from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system
from classes.models import Photo, SearchResults
from classes.models import Photo, SearchResultsPhoto
from modules.exif_reader import extract_location
from modules.hasher import get_phash, get_duplicates
from modules.scheduler import scheduler
@ -51,7 +51,7 @@ async def compress_image(image_path: str):
size_after = path.getsize(image_path) / 1024
logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb")
@app.post("/albums/{album}/photos", response_class=UJSONResponse, response_model=Photo, description="Upload a photo to album")
@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo)
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@ -195,7 +195,7 @@ async def photo_patch(id: str, caption: str, current_user: User = Security(get_c
}
)
@app.delete("/photos/{id}", description="Delete a photo by id")
@app.delete("/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT)
async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try:
@ -214,7 +214,7 @@ async def photo_delete(id: str, current_user: User = Security(get_current_active
return Response(status_code=HTTP_204_NO_CONTENT)
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResults)
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto)
async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@ -246,7 +246,7 @@ async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str,
images = list(col_photos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING))
for image in images:
output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"]})
output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"], "caption": image["caption"]})
if col_photos.count_documents( db_query_count ) > page*page_size:
token = str(token_urlsafe(32))
@ -257,7 +257,7 @@ async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str,
return UJSONResponse(output)
@app.get("/albums/{album}/photos/token", description="Find a photo by token", response_class=UJSONResponse, response_model=SearchResults)
@app.get("/albums/{album}/photos/token", description="Find a photo by token", response_class=UJSONResponse, response_model=SearchResultsPhoto)
async def photo_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} )

View File

@ -50,7 +50,7 @@ if configGet("registration_requires_confirmation") is True:
return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} )
if configGet("registration_enabled") is True:
@app.post("/users")
@app.post("/users", status_code=HTTP_204_NO_CONTENT)
async def user_create(user: str = Form(), email: str = Form(), password: str = Form()):
if col_users.find_one( {"user": user} ) is not None:
raise HTTPException(HTTP_406_NOT_ACCEPTABLE, detail=configGet("user_already_exists", "messages"))
@ -60,7 +60,7 @@ if configGet("registration_enabled") is True:
return Response(status_code=HTTP_204_NO_CONTENT)
@app.delete("/users/me/")
@app.delete("/users/me/", status_code=HTTP_204_NO_CONTENT)
async def user_delete(password: str = Form(), current_user: User = Depends(get_current_active_user)):
user = get_user(current_user.user)
if not user:

View File

@ -6,7 +6,7 @@ from typing import Union
from magic import Magic
from datetime import datetime, timezone
from os import makedirs, path, remove
from classes.models import Video, SearchResults
from classes.models import Video, SearchResultsVideo
#from modules.unified_exif_reader import extract_location
from modules.security import User, get_current_active_user
from modules.app import app
@ -151,7 +151,7 @@ async def video_patch(id: str, caption: str, current_user: User = Security(get_c
}
)
@app.delete("/videos/{id}", description="Delete a video by id")
@app.delete("/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT)
async def video_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try:
@ -167,7 +167,7 @@ async def video_delete(id: str, current_user: User = Security(get_current_active
return Response(status_code=HTTP_204_NO_CONTENT)
@app.get("/albums/{album}/videos", response_class=UJSONResponse, response_model=SearchResults, description="Find a video by filename")
@app.get("/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo)
async def video_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@ -195,7 +195,7 @@ async def video_find(album: str, q: Union[str, None] = None, caption: Union[str,
videos = list(col_videos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING))
for video in videos:
output["results"].append({"id": video["_id"].__str__(), "filename": video["filename"]})
output["results"].append({"id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"]})
if col_videos.count_documents( db_query_count ) > page*page_size:
token = str(token_urlsafe(32))
@ -206,7 +206,7 @@ async def video_find(album: str, q: Union[str, None] = None, caption: Union[str,
return UJSONResponse(output)
@app.get("/albums/{album}/videos/token", response_class=UJSONResponse, response_model=SearchResults, description="Find a video by token")
@app.get("/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo)
async def video_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} )