import re from os import makedirs, path, rename from shutil import rmtree from typing import Union from bson.errors import InvalidId from bson.objectid import ObjectId from fastapi import Security from fastapi.responses import Response, UJSONResponse from starlette.status import HTTP_204_NO_CONTENT from classes.exceptions import ( AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError, ) from classes.models import Album, AlbumModified, SearchResultsAlbum from modules.app import app from modules.database import col_albums, col_photos from modules.security import User, get_current_active_user album_create_responses = { 406: AlbumIncorrectError("name", "error").openapi, 409: AlbumAlreadyExistsError("name").openapi, } @app.post( "/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses, ) async def album_create( name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") if col_albums.find_one({"name": name}) is not None: raise AlbumAlreadyExistsError(name) makedirs( path.join("data", "users", current_user.user, "albums", name), exist_ok=True ) uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} ) return UJSONResponse( {"id": uploaded.inserted_id.__str__(), "name": name, "title": title} ) @app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum) async def album_find( q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"]), ): output = {"results": []} albums = list(col_albums.find({"user": current_user.user, "name": re.compile(q)})) for album in albums: output["results"].append( { "id": album["_id"].__str__(), "name": album["name"], "title": album["title"], } ) return UJSONResponse(output) album_patch_responses = { 404: AlbumNotFoundError("id").openapi, 406: AlbumIncorrectError("name", "error").openapi, } @app.patch( "/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses, ) async def album_patch( id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): try: album = col_albums.find_one({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) if title is not None: if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") else: title = album["title"] if name is not None: if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError( "name", "can only contain a-z, 0-9 and _ characters." ) if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") rename( path.join("data", "users", current_user.user, "albums", album["name"]), path.join("data", "users", current_user.user, "albums", name), ) col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}}, ) else: name = album["name"] if cover is not None: image = col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]}) cover = image["_id"].__str__() if image is not None else album["cover"] else: cover = album["cover"] col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse({"name": name, "title": title, "cover": cover}) album_put_responses = { 404: AlbumNotFoundError("id").openapi, 406: AlbumIncorrectError("name", "error").openapi, } @app.put( "/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses, ) async def album_put( id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): try: album = col_albums.find_one({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") image = col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]}) cover = image["_id"].__str__() if image is not None else None # type: ignore rename( path.join("data", "users", current_user.user, "albums", album["name"]), path.join("data", "users", current_user.user, "albums", name), ) col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse({"name": name, "title": title, "cover": cover}) album_delete_responses = {404: AlbumNotFoundError("id").openapi} @app.delete( "/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses, ) async def album_delete( id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): try: album = col_albums.find_one_and_delete({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) col_photos.delete_many({"album": album["name"]}) rmtree(path.join("data", "users", current_user.user, "albums", album["name"])) return Response(status_code=HTTP_204_NO_CONTENT)