import re from os import makedirs, rename from pathlib import Path from shutil import rmtree from typing import Union from bson.errors import InvalidId from bson.objectid import ObjectId from fastapi import Security from fastapi.responses import Response, UJSONResponse from starlette.status import HTTP_204_NO_CONTENT from classes.exceptions import ( AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError, ) from classes.models import Album, AlbumModified, SearchResultsAlbum from modules.app import app from modules.database import col_albums, col_photos from modules.security import User, get_current_active_user album_create_responses = { 406: AlbumIncorrectError("name", "error").openapi, 409: AlbumAlreadyExistsError("name").openapi, } @app.post( "/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses, ) async def album_create( name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") if (await col_albums.find_one({"name": name})) is not None: raise AlbumAlreadyExistsError(name) makedirs(Path(f"data/users/{current_user.user}/albums/{name}"), exist_ok=True) uploaded = await col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} ) return UJSONResponse( {"id": uploaded.inserted_id.__str__(), "name": name, "title": title} ) @app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum) async def album_find( q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"]), ): output = {"results": []} async for album in col_albums.find( {"user": current_user.user, "name": re.compile(q)} ): output["results"].append( { "id": album["_id"].__str__(), "name": album["name"], "title": album["title"], } ) return UJSONResponse(output) album_patch_responses = { 404: AlbumNotFoundError("id").openapi, 406: AlbumIncorrectError("name", "error").openapi, } @app.patch( "/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses, ) async def album_patch( id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): try: album = await col_albums.find_one({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId as exc: raise AlbumNotFoundError(id) from exc if title is None: title = album["title"] elif 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") if name is not None: if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError( "name", "can only contain a-z, 0-9 and _ characters." ) if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") rename( Path(f"data/users/{current_user.user}/albums/{album['name']}"), Path(f"data/users/{current_user.user}/albums/{name}"), ) await col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}}, ) else: name = album["name"] if cover is not None: image = await col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) cover = image["_id"].__str__() if image is not None else album["cover"] else: cover = album["cover"] await col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse({"name": name, "title": title, "cover": cover}) album_put_responses = { 404: AlbumNotFoundError("id").openapi, 406: AlbumIncorrectError("name", "error").openapi, } @app.put( "/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses, ) async def album_put( id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): try: album = await col_albums.find_one({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId as exc: raise AlbumNotFoundError(id) from exc if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") image = await col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]}) cover = image["_id"].__str__() if image is not None else None # type: ignore rename( Path(f"data/users/{current_user.user}/albums/{album['name']}"), Path(f"data/users/{current_user.user}/albums/{name}"), ) await col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) await col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse({"name": name, "title": title, "cover": cover}) album_delete_responses = {404: AlbumNotFoundError("id").openapi} @app.delete( "/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses, ) async def album_delete( id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"]), ): try: album = await col_albums.find_one_and_delete({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId as exc: raise AlbumNotFoundError(id) from exc await col_photos.delete_many({"album": album["name"]}) rmtree(Path(f"data/users/{current_user.user}/albums/{album['name']}")) return Response(status_code=HTTP_204_NO_CONTENT)