3 Commits

Author SHA1 Message Date
3520912aae Exceptions and type handling done 2023-02-16 15:44:54 +01:00
b285fc0668 Updated Video model 2023-02-16 15:33:06 +01:00
7580478ac3 Added docstrings 2023-02-16 15:32:56 +01:00
6 changed files with 153 additions and 40 deletions

View File

@@ -2,6 +2,7 @@ from typing import Literal
class AlbumNotFoundError(Exception): class AlbumNotFoundError(Exception):
"""Raises HTTP 404 if no album with this ID found."""
def __init__(self, id: str): def __init__(self, id: str):
self.id = id self.id = id
self.openapi = { self.openapi = {
@@ -16,6 +17,7 @@ class AlbumNotFoundError(Exception):
} }
class AlbumNameNotFoundError(Exception): class AlbumNameNotFoundError(Exception):
"""Raises HTTP 404 if no album with this name found."""
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
self.openapi = { self.openapi = {
@@ -30,6 +32,7 @@ class AlbumNameNotFoundError(Exception):
} }
class AlbumAlreadyExistsError(Exception): class AlbumAlreadyExistsError(Exception):
"""Raises HTTP 409 if album with this name already exists."""
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
self.openapi = { self.openapi = {
@@ -44,6 +47,7 @@ class AlbumAlreadyExistsError(Exception):
} }
class AlbumIncorrectError(Exception): class AlbumIncorrectError(Exception):
"""Raises HTTP 406 if album's title or name is invalid."""
def __init__(self, place: Literal["name", "title"], error: str) -> None: def __init__(self, place: Literal["name", "title"], error: str) -> None:
self.place = place self.place = place
self.error = error self.error = error
@@ -59,6 +63,7 @@ class AlbumIncorrectError(Exception):
} }
class PhotoNotFoundError(Exception): class PhotoNotFoundError(Exception):
"""Raises HTTP 404 if no photo with this ID found."""
def __init__(self, id: str): def __init__(self, id: str):
self.id = id self.id = id
self.openapi = { self.openapi = {
@@ -72,7 +77,51 @@ class PhotoNotFoundError(Exception):
} }
} }
class PhotoSearchQueryEmptyError(Exception):
"""Raises HTTP 422 if no photo search query provided."""
def __init__(self):
self.openapi = {
"description": "Invalid Query",
"content": {
"application/json": {
"example": {
"detail": "You must provide query, caption or coordinates to look for photos."
}
}
}
}
class VideoNotFoundError(Exception):
"""Raises HTTP 404 if no video with this ID found."""
def __init__(self, id: str):
self.id = id
self.openapi = {
"description": "Video Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find video with id '{id}'."
}
}
}
}
class VideoSearchQueryEmptyError(Exception):
"""Raises HTTP 422 if no video search query provided."""
def __init__(self):
self.openapi = {
"description": "Invalid Query",
"content": {
"application/json": {
"example": {
"detail": "You must provide query or caption to look for videos."
}
}
}
}
class SearchPageInvalidError(Exception): class SearchPageInvalidError(Exception):
"""Raises HTTP 400 if page or page size are not in valid range."""
def __init__(self): def __init__(self):
self.openapi = { self.openapi = {
"description": "Invalid Page", "description": "Invalid Page",
@@ -86,6 +135,7 @@ class SearchPageInvalidError(Exception):
} }
class SearchTokenInvalidError(Exception): class SearchTokenInvalidError(Exception):
"""Raises HTTP 401 if search token is not valid."""
def __init__(self): def __init__(self):
self.openapi = { self.openapi = {
"description": "Invalid Token", "description": "Invalid Token",
@@ -99,6 +149,7 @@ class SearchTokenInvalidError(Exception):
} }
class UserEmailCodeInvalid(Exception): class UserEmailCodeInvalid(Exception):
"""Raises HTTP 400 if email confirmation code is not valid."""
def __init__(self): def __init__(self):
self.openapi = { self.openapi = {
"description": "Invalid Email Code", "description": "Invalid Email Code",
@@ -112,6 +163,7 @@ class UserEmailCodeInvalid(Exception):
} }
class UserAlreadyExists(Exception): class UserAlreadyExists(Exception):
"""Raises HTTP 409 if user with this name already exists."""
def __init__(self): def __init__(self):
self.openapi = { self.openapi = {
"description": "User Already Exists", "description": "User Already Exists",
@@ -125,6 +177,7 @@ class UserAlreadyExists(Exception):
} }
class UserCredentialsInvalid(Exception): class UserCredentialsInvalid(Exception):
"""Raises HTTP 401 if user credentials are not valid."""
def __init__(self): def __init__(self):
self.openapi = { self.openapi = {
"description": "Invalid Credentials", "description": "Invalid Credentials",

View File

@@ -1,6 +1,7 @@
from typing import List, Union from typing import List, Union
from pydantic import BaseModel from pydantic import BaseModel
class Photo(BaseModel): class Photo(BaseModel):
id: str id: str
album: str album: str
@@ -17,17 +18,24 @@ class PhotoSearch(BaseModel):
filename: str filename: str
caption: Union[str, None] caption: Union[str, None]
class Video(BaseModel): class Video(BaseModel):
id: str id: str
album: str album: str
hash: str hash: str
filename: str filename: str
class VideoPublic(BaseModel):
id: str
caption: str
filename: str
class VideoSearch(BaseModel): class VideoSearch(BaseModel):
id: str id: str
filename: str filename: str
caption: Union[str, None] caption: Union[str, None]
class Album(BaseModel): class Album(BaseModel):
id: str id: str
name: str name: str
@@ -43,6 +51,7 @@ class AlbumModified(BaseModel):
title: str title: str
cover: Union[str, None] cover: Union[str, None]
class SearchResultsAlbum(BaseModel): class SearchResultsAlbum(BaseModel):
results: List[Album] results: List[Album]
next_page: Union[str, None] next_page: Union[str, None]
@@ -53,7 +62,4 @@ class SearchResultsPhoto(BaseModel):
class SearchResultsVideo(BaseModel): class SearchResultsVideo(BaseModel):
results: List[VideoSearch] results: List[VideoSearch]
next_page: Union[str, None] next_page: Union[str, None]
class EmailConfirmed(BaseModel):
detail: str

View File

@@ -2,7 +2,7 @@ from fastapi import Request
from fastapi.responses import UJSONResponse from fastapi.responses import UJSONResponse
from modules.app import app from modules.app import app
from classes.exceptions import * from classes.exceptions import *
from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY
@app.exception_handler(AlbumNotFoundError) @app.exception_handler(AlbumNotFoundError)
async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError): async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError):
@@ -32,6 +32,27 @@ async def photo_not_found_exception_handler(request: Request, exc: PhotoNotFound
content={"detail": f"Could not find photo with id '{exc.id}'."}, content={"detail": f"Could not find photo with id '{exc.id}'."},
) )
@app.exception_handler(PhotoSearchQueryEmptyError)
async def photo_search_query_empty_exception_handler(request: Request, exc: PhotoSearchQueryEmptyError):
return UJSONResponse(
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": "You must provide query, caption or coordinates to look for photos."},
)
@app.exception_handler(VideoNotFoundError)
async def video_not_found_exception_handler(request: Request, exc: VideoNotFoundError):
return UJSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={"detail": f"Could not find video with id '{exc.id}'."},
)
@app.exception_handler(VideoSearchQueryEmptyError)
async def video_search_query_empty_exception_handler(request: Request, exc: VideoSearchQueryEmptyError):
return UJSONResponse(
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": "You must provide query, caption or coordinates to look for photos."},
)
@app.exception_handler(SearchPageInvalidError) @app.exception_handler(SearchPageInvalidError)
async def search_page_invalid_exception_handler(request: Request, exc: SearchPageInvalidError): async def search_page_invalid_exception_handler(request: Request, exc: SearchPageInvalidError):
return UJSONResponse( return UJSONResponse(

View File

@@ -7,7 +7,7 @@ from typing import Union
from magic import Magic from magic import Magic
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system from os import makedirs, path, remove, system
from classes.exceptions import AlbumNameNotFoundError, PhotoNotFoundError, SearchPageInvalidError, SearchTokenInvalidError from classes.exceptions import AlbumNameNotFoundError, PhotoNotFoundError, PhotoSearchQueryEmptyError, SearchPageInvalidError, SearchTokenInvalidError
from classes.models import Photo, PhotoPublic, SearchResultsPhoto from classes.models import Photo, PhotoPublic, SearchResultsPhoto
from modules.exif_reader import extract_location from modules.exif_reader import extract_location
from modules.hasher import get_phash, get_duplicates from modules.hasher import get_phash, get_duplicates
@@ -20,9 +20,9 @@ from bson.objectid import ObjectId
from bson.errors import InvalidId from bson.errors import InvalidId
from plum.exceptions import UnpackError from plum.exceptions import UnpackError
from fastapi import HTTPException, UploadFile, Security from fastapi import UploadFile, Security
from fastapi.responses import UJSONResponse, Response from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY from starlette.status import HTTP_204_NO_CONTENT, HTTP_409_CONFLICT
from modules.utils import logWrite from modules.utils import logWrite
@@ -52,7 +52,7 @@ async def compress_image(image_path: str):
size_after = path.getsize(image_path) / 1024 size_after = path.getsize(image_path) / 1024
logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb") logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb")
photo_post_reponses = { photo_post_responses = {
404: AlbumNameNotFoundError("name").openapi, 404: AlbumNameNotFoundError("name").openapi,
409: { 409: {
"description": "Image Duplicates Found", "description": "Image Duplicates Found",
@@ -68,7 +68,7 @@ photo_post_reponses = {
} }
} }
} }
@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_reponses) @app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_responses)
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@@ -142,7 +142,10 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F
} }
) )
@app.get("/photos/{id}", description="Get a photo by id") photo_get_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses)
async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"])): async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"])):
try: try:
@@ -243,11 +246,12 @@ async def photo_delete(id: str, current_user: User = Security(get_current_active
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
photo_find_reponses = { photo_find_responses = {
400: SearchPageInvalidError().openapi, 400: SearchPageInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi 404: AlbumNameNotFoundError("name").openapi,
422: PhotoSearchQueryEmptyError().openapi
} }
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_reponses) @app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_responses)
async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])): async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@@ -265,7 +269,7 @@ async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str,
db_query = {"user": current_user.user, "album": album, "location": { "$nearSphere": {"$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius} } } db_query = {"user": current_user.user, "album": album, "location": { "$nearSphere": {"$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius} } }
db_query_count = {"user": current_user.user, "album": album, "location": { "$geoWithin": { "$centerSphere": [ [lng, lat], radius ] } } } db_query_count = {"user": current_user.user, "album": album, "location": { "$geoWithin": { "$centerSphere": [ [lng, lat], radius ] } } }
elif q is None and caption is None: elif q is None and caption is None:
raise HTTPException(status_code=HTTP_422_UNPROCESSABLE_ENTITY, detail="You must provide query, caption or coordinates to look for photos") raise PhotoSearchQueryEmptyError()
elif q is None and caption is not None: elif q is None and caption is not None:
db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)} db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)} db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
@@ -290,7 +294,10 @@ async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str,
return UJSONResponse(output) return UJSONResponse(output)
@app.get("/albums/{album}/photos/token", description="Find a photo by token", response_class=UJSONResponse, response_model=SearchResultsPhoto) photo_find_token_responses = {
401: SearchTokenInvalidError().openapi
}
@app.get("/albums/{album}/photos/token", description="Find a photo by token", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_token_responses)
async def photo_find_token(token: str): async def photo_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} ) found_record = col_tokens.find_one( {"token": token} )

View File

@@ -54,8 +54,8 @@ user_confirm_responses = {
400: UserEmailCodeInvalid().openapi 400: UserEmailCodeInvalid().openapi
} }
if configGet("registration_requires_confirmation") is True: if configGet("registration_requires_confirmation") is True:
@app.get("/users/{user}/confirm", response_class=UJSONResponse, response_model=EmailConfirmed, responses=user_confirm_responses) @app.get("/users/{user}/confirm", response_class=UJSONResponse, responses=user_confirm_responses)
@app.patch("/users/{user}/confirm", response_class=UJSONResponse, response_model=EmailConfirmed, responses=user_confirm_responses) @app.patch("/users/{user}/confirm", response_class=UJSONResponse, responses=user_confirm_responses)
async def user_confirm(user: str, code: str): async def user_confirm(user: str, code: str):
confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} ) confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} )
if confirm_record is None: if confirm_record is None:

View File

@@ -6,7 +6,8 @@ from typing import Union
from magic import Magic from magic import Magic
from datetime import datetime, timezone from datetime import datetime, timezone
from os import makedirs, path, remove from os import makedirs, path, remove
from classes.models import Video, SearchResultsVideo from classes.exceptions import AlbumNameNotFoundError, SearchPageInvalidError, SearchTokenInvalidError, VideoNotFoundError, VideoSearchQueryEmptyError
from classes.models import Video, SearchResultsVideo, VideoPublic
#from modules.unified_exif_reader import extract_location #from modules.unified_exif_reader import extract_location
from modules.security import User, get_current_active_user from modules.security import User, get_current_active_user
from modules.app import app from modules.app import app
@@ -15,16 +16,18 @@ from bson.objectid import ObjectId
from bson.errors import InvalidId from bson.errors import InvalidId
from pymongo import DESCENDING from pymongo import DESCENDING
from fastapi import HTTPException, UploadFile, Security from fastapi import UploadFile, Security
from fastapi.responses import UJSONResponse, Response from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_422_UNPROCESSABLE_ENTITY from starlette.status import HTTP_204_NO_CONTENT
video_post_responses = {
@app.post("/albums/{album}/videos", response_class=UJSONResponse, response_model=Video, description="Upload a video to album") 404: AlbumNameNotFoundError("name").openapi
}
@app.post("/albums/{album}/videos", description="Upload a video to album", response_class=UJSONResponse, response_model=Video, responses=video_post_responses)
async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") raise AlbumNameNotFoundError(album)
# if not file.content_type.startswith("video"): # if not file.content_type.startswith("video"):
# raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not a video, not accepting.") # raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not a video, not accepting.")
@@ -76,11 +79,15 @@ async def video_upload(file: UploadFile, album: str, caption: Union[str, None] =
{ {
"id": uploaded.inserted_id.__str__(), "id": uploaded.inserted_id.__str__(),
"album": album, "album": album,
"hash": "", # SHOULD BE DONE
"filename": filename "filename": filename
} }
) )
@app.get("/videos/{id}", description="Get a video by id") video_get_responses = {
404: VideoNotFoundError("id").openapi
}
@app.get("/videos/{id}", description="Get a video by id", responses=video_get_responses)
async def video_get(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"])): async def video_get(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"])):
try: try:
@@ -88,7 +95,7 @@ async def video_get(id: str, current_user: User = Security(get_current_active_us
if video is None: if video is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find a video with such id.") raise VideoNotFoundError(id)
video_path = path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]) video_path = path.join("data", "users", current_user.user, "albums", video["album"], video["filename"])
@@ -98,7 +105,10 @@ async def video_get(id: str, current_user: User = Security(get_current_active_us
return Response(video_file, media_type=mime) return Response(video_file, media_type=mime)
@app.put("/videos/{id}", description="Move a video into another album") video_move_responses = {
404: VideoNotFoundError("id").openapi
}
@app.put("/videos/{id}", description="Move a video into another album", response_model=VideoPublic, responses=video_move_responses)
async def video_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): async def video_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try: try:
@@ -106,10 +116,10 @@ async def video_move(id: str, album: str, current_user: User = Security(get_curr
if video is None: if video is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an video with such id.") raise VideoNotFoundError(id)
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") raise AlbumNameNotFoundError(album)
if path.exists(path.join("data", "users", current_user.user, "albums", album, video["filename"])): if path.exists(path.join("data", "users", current_user.user, "albums", album, video["filename"])):
base_name = video["filename"].split(".")[:-1] base_name = video["filename"].split(".")[:-1]
@@ -128,11 +138,15 @@ async def video_move(id: str, album: str, current_user: User = Security(get_curr
return UJSONResponse( return UJSONResponse(
{ {
"id": video["_id"].__str__(), "id": video["_id"].__str__(),
"caption": video["caption"],
"filename": filename "filename": filename
} }
) )
@app.patch("/videos/{id}", description="Change properties of a video") video_patch_responses = {
404: VideoNotFoundError("id").openapi
}
@app.patch("/videos/{id}", description="Change properties of a video", response_model=VideoPublic, responses=video_patch_responses)
async def video_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): async def video_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try: try:
@@ -140,18 +154,22 @@ async def video_patch(id: str, caption: str, current_user: User = Security(get_c
if video is None: if video is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an video with such id.") raise VideoNotFoundError(id)
col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} ) col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} )
return UJSONResponse( return UJSONResponse(
{ {
"id": video["_id"].__str__(), "id": video["_id"].__str__(),
"filename": caption "caption": video["caption"],
"filename": video["filename"]
} }
) )
@app.delete("/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT) video_delete_responses = {
404: VideoNotFoundError("id").openapi
}
@app.delete("/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT, responses=video_delete_responses)
async def video_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): async def video_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try: try:
@@ -159,7 +177,7 @@ async def video_delete(id: str, current_user: User = Security(get_current_active
if video is None: if video is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find a video with such id.") raise VideoNotFoundError(id)
album = col_albums.find_one( {"name": video["album"]} ) album = col_albums.find_one( {"name": video["album"]} )
@@ -167,20 +185,25 @@ async def video_delete(id: str, current_user: User = Security(get_current_active
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
@app.get("/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo) video_find_responses = {
400: SearchPageInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
422: VideoSearchQueryEmptyError().openapi
}
@app.get("/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_responses)
async def video_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"])): async def video_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") raise AlbumNameNotFoundError(album)
if page <= 0 or page_size <= 0: if page <= 0 or page_size <= 0:
raise HTTPException(status_code=HTTP_400_BAD_REQUEST, detail="Parameters 'page' and 'page_size' must be greater or equal to 1.") raise SearchPageInvalidError()
output = {"results": []} output = {"results": []}
skip = (page-1)*page_size skip = (page-1)*page_size
if q is None and caption is None: if q is None and caption is None:
raise HTTPException(status_code=HTTP_422_UNPROCESSABLE_ENTITY, detail="You must provide query or caption to look for videos") raise VideoSearchQueryEmptyError()
if q is None and caption is not None: if q is None and caption is not None:
db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)} db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
@@ -206,12 +229,15 @@ async def video_find(album: str, q: Union[str, None] = None, caption: Union[str,
return UJSONResponse(output) return UJSONResponse(output)
@app.get("/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo) video_find_token_responses = {
401: SearchTokenInvalidError().openapi
}
@app.get("/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_token_responses)
async def video_find_token(token: str): async def video_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} ) found_record = col_tokens.find_one( {"token": token} )
if found_record is None: if found_record is None:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid search token.") raise SearchTokenInvalidError()
return await video_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"])) return await video_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))