PhotosAPI/extensions/albums.py

230 lines
6.8 KiB
Python

import re
from os import makedirs, rename
from pathlib import Path
from shutil import rmtree
from typing import Union
from bson.errors import InvalidId
from bson.objectid import ObjectId
from fastapi import Security
from fastapi.responses import Response, UJSONResponse
from starlette.status import HTTP_204_NO_CONTENT
from classes.exceptions import (
AlbumAlreadyExistsError,
AlbumIncorrectError,
AlbumNotFoundError,
)
from classes.models import Album, AlbumModified, SearchResultsAlbum
from modules.app import app
from modules.database import col_albums, col_photos
from modules.security import User, get_current_active_user
album_create_responses = {
406: AlbumIncorrectError("name", "error").openapi,
409: AlbumAlreadyExistsError("name").openapi,
}
@app.post(
"/albums",
description="Create album with name and title",
response_class=UJSONResponse,
response_model=Album,
responses=album_create_responses,
)
async def album_create(
name: str,
title: str,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if (await col_albums.find_one({"name": name})) is not None:
raise AlbumAlreadyExistsError(name)
makedirs(Path(f"data/users/{current_user.user}/albums/{name}"), exist_ok=True)
uploaded = await col_albums.insert_one(
{"user": current_user.user, "name": name, "title": title, "cover": None}
)
return UJSONResponse(
{"id": uploaded.inserted_id.__str__(), "name": name, "title": title}
)
@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum)
async def album_find(
q: str,
current_user: User = Security(get_current_active_user, scopes=["albums.list"]),
):
output = {"results": []}
async for album in col_albums.find(
{"user": current_user.user, "name": re.compile(q)}
):
output["results"].append(
{
"id": album["_id"].__str__(),
"name": album["name"],
"title": album["title"],
}
)
return UJSONResponse(output)
album_patch_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi,
}
@app.patch(
"/albums/{id}",
description="Modify album's name or title by id",
response_class=UJSONResponse,
response_model=AlbumModified,
responses=album_patch_responses,
)
async def album_patch(
id: str,
name: Union[str, None] = None,
title: Union[str, None] = None,
cover: Union[str, None] = None,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
try:
album = await col_albums.find_one({"_id": ObjectId(id)})
if album is None:
raise InvalidId(id)
except InvalidId as exc:
raise AlbumNotFoundError(id) from exc
if title is None:
title = album["title"]
elif 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if name is not None:
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError(
"name", "can only contain a-z, 0-9 and _ characters."
)
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
rename(
Path(f"data/users/{current_user.user}/albums/{album['name']}"),
Path(f"data/users/{current_user.user}/albums/{name}"),
)
await col_photos.update_many(
{"user": current_user.user, "album": album["name"]},
{"$set": {"album": name}},
)
else:
name = album["name"]
if cover is not None:
image = await col_photos.find_one(
{"_id": ObjectId(cover), "album": album["name"]}
)
cover = image["_id"].__str__() if image is not None else album["cover"]
else:
cover = album["cover"]
await col_albums.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}}
)
return UJSONResponse({"name": name, "title": title, "cover": cover})
album_put_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi,
}
@app.put(
"/albums/{id}",
description="Modify album's name and title by id",
response_class=UJSONResponse,
response_model=AlbumModified,
responses=album_put_responses,
)
async def album_put(
id: str,
name: str,
title: str,
cover: str,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
try:
album = await col_albums.find_one({"_id": ObjectId(id)})
if album is None:
raise InvalidId(id)
except InvalidId as exc:
raise AlbumNotFoundError(id) from exc
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
image = await col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]})
cover = image["_id"].__str__() if image is not None else None # type: ignore
rename(
Path(f"data/users/{current_user.user}/albums/{album['name']}"),
Path(f"data/users/{current_user.user}/albums/{name}"),
)
await col_photos.update_many(
{"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}}
)
await col_albums.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}}
)
return UJSONResponse({"name": name, "title": title, "cover": cover})
album_delete_responses = {404: AlbumNotFoundError("id").openapi}
@app.delete(
"/album/{id}",
description="Delete album by id",
status_code=HTTP_204_NO_CONTENT,
responses=album_delete_responses,
)
async def album_delete(
id: str,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
try:
album = await col_albums.find_one_and_delete({"_id": ObjectId(id)})
if album is None:
raise InvalidId(id)
except InvalidId as exc:
raise AlbumNotFoundError(id) from exc
await col_photos.delete_many({"album": album["name"]})
rmtree(Path(f"data/users/{current_user.user}/albums/{album['name']}"))
return Response(status_code=HTTP_204_NO_CONTENT)