import re from datetime import datetime, timezone from os import makedirs, remove from pathlib import Path from random import randint from secrets import token_urlsafe from shutil import move from typing import Union import aiofiles from bson.errors import InvalidId from bson.objectid import ObjectId from fastapi import Security, UploadFile from fastapi.responses import Response, UJSONResponse from magic import Magic from pymongo import DESCENDING from starlette.status import HTTP_204_NO_CONTENT from classes.exceptions import ( AlbumNameNotFoundError, SearchLimitInvalidError, SearchPageInvalidError, SearchTokenInvalidError, UserMediaQuotaReached, VideoNotFoundError, VideoSearchQueryEmptyError, ) from classes.models import ( RandomSearchResultsVideo, SearchResultsVideo, Video, VideoPublic, ) from modules.app import app from modules.database import col_albums, col_photos, col_tokens, col_videos from modules.security import User, get_current_active_user video_post_responses = { 403: UserMediaQuotaReached().openapi, 404: AlbumNameNotFoundError("name").openapi, } @app.post( "/albums/{album}/videos", description="Upload a video to album", response_class=UJSONResponse, response_model=Video, responses=video_post_responses, ) async def video_upload( file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): if (await col_albums.find_one({"user": current_user.user, "name": album})) is None: raise AlbumNameNotFoundError(album) user_media_count = ( await col_videos.count_documents({"user": current_user.user}) ) + (await col_photos.count_documents({"user": current_user.user})) if user_media_count >= current_user.quota and not current_user.quota == -1: # type: ignore raise UserMediaQuotaReached() makedirs(Path(f"data/users/{current_user.user}/albums/{album}"), exist_ok=True) filename = file.filename if Path(f"data/users/{current_user.user}/albums/{album}/{file.filename}").exists(): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] filename = ( ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension ) async with aiofiles.open( Path(f"data/users/{current_user.user}/albums/{album}/{filename}"), "wb" ) as f: await f.write(await file.read()) # Hashing and duplicates check should be here # Coords extraction should be here uploaded = await col_videos.insert_one( { "user": current_user.user, "album": album, "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), "modified": datetime.now(tz=timezone.utc), }, "caption": caption, } ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "album": album, "hash": "", # SHOULD BE DONE "filename": filename, } ) video_get_responses = { 200: { "content": { "application/octet-stream": { "schema": { "type": "string", "format": "binary", "contentMediaType": "video/*", } } } }, 404: VideoNotFoundError("id").openapi, } @app.get( "/videos/{id}", description="Get a video by id", responses=video_get_responses, response_class=Response, ) async def video_get( id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"]), ): try: video = await col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId as exc: raise VideoNotFoundError(id) from exc video_path = Path( f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}" ) mime = Magic(mime=True).from_file(video_path) async with aiofiles.open(video_path, "rb") as f: video_file = await f.read() return Response(content=video_file, media_type=mime) video_move_responses = {404: VideoNotFoundError("id").openapi} @app.put( "/videos/{id}", description="Move a video into another album", response_model=VideoPublic, responses=video_move_responses, ) async def video_move( id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): try: video = await col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId as exc: raise VideoNotFoundError(id) from exc if (await col_albums.find_one({"user": current_user.user, "name": album})) is None: raise AlbumNameNotFoundError(album) if Path( f"data/users/{current_user.user}/albums/{album}/{video['filename']}" ).exists(): base_name = video["filename"].split(".")[:-1] extension = video["filename"].split(".")[-1] filename = ( ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension ) else: filename = video["filename"] await col_videos.find_one_and_update( {"_id": ObjectId(id)}, { "$set": { "album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc), } }, ) move( Path( f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}" ), Path(f"data/users/{current_user.user}/albums/{album}/{filename}"), ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], "filename": filename, } ) video_patch_responses = {404: VideoNotFoundError("id").openapi} @app.patch( "/videos/{id}", description="Change properties of a video", response_model=VideoPublic, responses=video_patch_responses, ) async def video_patch( id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): try: video = await col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId as exc: raise VideoNotFoundError(id) from exc await col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}}, ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], "filename": video["filename"], } ) video_delete_responses = {404: VideoNotFoundError("id").openapi} @app.delete( "/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT, responses=video_delete_responses, ) async def video_delete( id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): try: video = await col_videos.find_one_and_delete({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId as exc: raise VideoNotFoundError(id) from exc album = await col_albums.find_one({"name": video["album"]}) remove( Path( f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}" ) ) return Response(status_code=HTTP_204_NO_CONTENT) video_random_responses = { 400: SearchLimitInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, } @app.get( "/albums/{album}/videos/random", description="Get one random video, optionally by caption", response_class=UJSONResponse, response_model=RandomSearchResultsVideo, responses=video_random_responses, ) async def video_random( album: str, caption: Union[str, None] = None, limit: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"]), ): if (await col_albums.find_one({"user": current_user.user, "name": album})) is None: raise AlbumNameNotFoundError(album) if limit <= 0: raise SearchLimitInvalidError() output = {"results": []} db_query = ( { "user": current_user.user, "album": album, "caption": re.compile(caption), } if caption is not None else { "user": current_user.user, "album": album, } ) documents_count = await col_videos.count_documents(db_query) skip = randint(0, documents_count - 1) if documents_count > 1 else 0 async for video in col_videos.aggregate( [ {"$match": db_query}, {"$skip": skip}, {"$limit": limit}, ] ): output["results"].append( { "id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"], } ) return UJSONResponse(output) video_find_responses = { 400: SearchPageInvalidError().openapi, 401: SearchTokenInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, 422: VideoSearchQueryEmptyError().openapi, } @app.get( "/albums/{album}/videos", description="Find a video by filename, caption or token", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_responses, ) async def video_find( album: str, q: Union[str, None] = None, caption: Union[str, None] = None, token: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"]), ): if token is not None: found_record = await col_tokens.find_one({"token": token}) if found_record is None: raise SearchTokenInvalidError() return await video_find( album=album, q=found_record["query"], caption=found_record["caption"], page=found_record["page"], page_size=found_record["page_size"], current_user=current_user, ) if (await col_albums.find_one({"user": current_user.user, "name": album})) is None: raise AlbumNameNotFoundError(album) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} skip = (page - 1) * page_size if q is None and caption is None: raise VideoSearchQueryEmptyError() if q is None: db_query = { "user": current_user.user, "album": album, "caption": re.compile(caption), } db_query_count = { "user": current_user.user, "album": album, "caption": re.compile(caption), } elif caption is None: db_query = { "user": current_user.user, "album": album, "filename": re.compile(q), } db_query_count = { "user": current_user.user, "album": album, "caption": re.compile(q), } else: db_query = { "user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption), } db_query_count = { "user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption), } async for video in col_videos.find(db_query, limit=page_size, skip=skip).sort( "dates.uploaded", direction=DESCENDING ): output["results"].append( { "id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"], } ) if (await col_videos.count_documents(db_query_count)) > page * page_size: token = str(token_urlsafe(32)) await col_tokens.insert_one( { "token": token, "query": q, "caption": caption, "page": page + 1, "page_size": page_size, } ) output["next_page"] = f"/albums/{album}/videos/?token={token}" # type: ignore else: output["next_page"] = None # type: ignore return UJSONResponse(output)