import re from os import makedirs, path, rename from shutil import rmtree from typing import Union from classes.models import Album, AlbumModified, SearchResults from modules.app import app from modules.database import col_photos, col_albums from modules.security import User, get_current_active_user from bson.objectid import ObjectId from bson.errors import InvalidId from fastapi import HTTPException, Security from fastapi.responses import UJSONResponse, Response from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT @app.post("/albums", response_class=UJSONResponse, response_model=Album, description="Create album with name and title") async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.") if 2 > len(name) > 20: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.") if 2 > len(title) > 40: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.") if col_albums.find_one( {"name": name} ) is not None: return HTTPException(status_code=HTTP_409_CONFLICT, detail=f"Album with name '{name}' already exists.") makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True) uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "name": name, "title": title } ) @app.get("/albums", response_model=SearchResults, description="Find album by name") async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])): output = {"results": []} albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} )) for album in albums: output["results"].append( {"id": album["_id"].__str__(), "name": album["name"]} ) return UJSONResponse(output) @app.patch("/albums/{id}", response_class=UJSONResponse, response_model=AlbumModified, description="Modify album's name or title by id") async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): try: album = col_albums.find_one( {"_id": ObjectId(id)} ) if album is None: raise InvalidId(id) except InvalidId: return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.") if title is not None: if 2 > len(title) > 40: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.") else: title = album["title"] if name is not None: if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.") if 2 > len(name) > 20: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.") rename( path.join("data", "users", current_user.user, "albums", album["name"]), path.join("data", "users", current_user.user, "albums", name) ) col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) else: name = album["name"] if cover is not None: image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) cover = image["_id"].__str__() if image is not None else album["cover"] else: cover = album["cover"] col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse( { "name": name, "title": title, "cover": cover } ) @app.put("/albums/{id}", response_class=UJSONResponse, response_model=AlbumModified, description="Modify album's name and title by id") async def album_put(id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): try: album = col_albums.find_one( {"_id": ObjectId(id)} ) if album is None: raise InvalidId(id) except InvalidId: return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.") if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name can only contain: a-z, 0-9 and _ characters.") if 2 > len(name) > 20: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album name must be >2 and <20 characters.") if 2 > len(title) > 40: return HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Album title must be >2 and <40 characters.") image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) cover = image["_id"].__str__() if image is not None else None # type: ignore rename( path.join("data", "users", current_user.user, "albums", album["name"]), path.join("data", "users", current_user.user, "albums", name) ) col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) return UJSONResponse( { "name": name, "title": title, "cover": cover } ) @app.delete("/album/{id}", response_class=UJSONResponse, description="Delete album by id") async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): try: album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} ) if album is None: raise InvalidId(id) except InvalidId: return HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an album with such id.") col_photos.delete_many( {"album": album["name"]} ) rmtree(path.join("data", "users", current_user.user, "albums", album["name"])) return Response(status_code=HTTP_204_NO_CONTENT)