PhotosAPI/extensions/albums.py

167 lines
6.6 KiB
Python

import re
from os import makedirs, path, rename
from shutil import rmtree
from typing import Union
from classes.exceptions import AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError
from classes.models import Album, AlbumModified, SearchResultsAlbum
from modules.app import app
from modules.database import col_photos, col_albums
from modules.security import User, get_current_active_user
from bson.objectid import ObjectId
from bson.errors import InvalidId
from fastapi import Security
from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT
album_create_responses = {
406: AlbumIncorrectError("name", "error").openapi,
409: AlbumAlreadyExistsError("name").openapi
}
@app.post("/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses)
async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if col_albums.find_one( {"name": name} ) is not None:
raise AlbumAlreadyExistsError(name)
makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True)
uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} )
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"name": name,
"title": title
}
)
@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum)
async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])):
output = {"results": []}
albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} ))
for album in albums:
output["results"].append( {"id": album["_id"].__str__(), "name": album["name"], "title": album["title"]} )
return UJSONResponse(output)
album_patch_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi
}
@app.patch("/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses)
async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
raise AlbumNotFoundError(id)
if title is not None:
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
else:
title = album["title"]
if name is not None:
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
else:
name = album["name"]
if cover is not None:
image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} )
cover = image["_id"].__str__() if image is not None else album["cover"]
else:
cover = album["cover"]
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} )
return UJSONResponse(
{
"name": name,
"title": title,
"cover": cover
}
)
album_put_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi
}
@app.put("/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses)
async def album_put(id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
raise AlbumNotFoundError(id)
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} )
cover = image["_id"].__str__() if image is not None else None # type: ignore
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} )
return UJSONResponse(
{
"name": name,
"title": title,
"cover": cover
}
)
album_delete_responses = {
404: AlbumNotFoundError("id").openapi
}
@app.delete("/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses)
async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
try:
album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} )
if album is None:
raise InvalidId(id)
except InvalidId:
raise AlbumNotFoundError(id)
col_photos.delete_many( {"album": album["name"]} )
rmtree(path.join("data", "users", current_user.user, "albums", album["name"]))
return Response(status_code=HTTP_204_NO_CONTENT)