import re from os import makedirs, path, rename from shutil import rmtree from typing import Union from classes.exceptions import AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError from classes.models import Album, AlbumModified, SearchResultsAlbum from modules.app import app from modules.database import col_photos, col_albums from modules.security import User, get_current_active_user from bson.objectid import ObjectId from bson.errors import InvalidId from fastapi import HTTPException, Security from fastapi.responses import UJSONResponse, Response from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT album_create_responses = { 406: AlbumIncorrectError("name", "error").openapi, 409: AlbumAlreadyExistsError("name").openapi } @app.post("/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses) async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") if col_albums.find_one( {"name": name} ) is not None: raise AlbumAlreadyExistsError(name) makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True) uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "name": name, "title": title } ) @app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum) async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])): output = {"results": []} albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} )) for album in albums: output["results"].append( {"id": album["_id"].__str__(), "name": album["name"], "title": album["title"]} ) return UJSONResponse(output) album_patch_responses = { 404: AlbumNotFoundError("id").openapi, 406: AlbumIncorrectError("name", "error").openapi } @app.patch("/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses) async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): try: album = col_albums.find_one( {"_id": ObjectId(id)} ) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) if title is not None: if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") else: title = album["title"] if name is not None: if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") rename( path.join("data", "users", current_user.user, "albums", album["name"]), path.join("data", "users", current_user.user, "albums", name) ) col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) else: name = album["name"] if cover is not None: image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) cover = image["_id"].__str__() if image is not None else album["cover"] else: cover = album["cover"] col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse( { "name": name, "title": title, "cover": cover } ) album_put_responses = { 404: AlbumNotFoundError("id").openapi, 406: AlbumIncorrectError("name", "error").openapi } @app.put("/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses) async def album_put(id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): try: album = col_albums.find_one( {"_id": ObjectId(id)} ) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) cover = image["_id"].__str__() if image is not None else None # type: ignore rename( path.join("data", "users", current_user.user, "albums", album["name"]), path.join("data", "users", current_user.user, "albums", name) ) col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse( { "name": name, "title": title, "cover": cover } ) album_delete_responses = { 404: AlbumNotFoundError("id").openapi } @app.delete("/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses) async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): try: album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} ) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) col_photos.delete_many( {"album": album["name"]} ) rmtree(path.join("data", "users", current_user.user, "albums", album["name"])) return Response(status_code=HTTP_204_NO_CONTENT)