import re import pickle from secrets import token_urlsafe from shutil import move from typing import Union from magic import Magic from datetime import datetime, timezone from os import makedirs, path, remove from classes.exceptions import ( AlbumNameNotFoundError, SearchPageInvalidError, SearchTokenInvalidError, VideoNotFoundError, VideoSearchQueryEmptyError, ) from classes.models import Video, SearchResultsVideo, VideoPublic from modules.security import User, get_current_active_user from modules.app import app from modules.database import col_videos, col_albums, col_tokens from bson.objectid import ObjectId from bson.errors import InvalidId from pymongo import DESCENDING from fastapi import UploadFile, Security from fastapi.responses import UJSONResponse, Response from starlette.status import HTTP_204_NO_CONTENT video_post_responses = {404: AlbumNameNotFoundError("name").openapi} @app.post( "/albums/{album}/videos", description="Upload a video to album", response_class=UJSONResponse, response_model=Video, responses=video_post_responses, ) async def video_upload( file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) makedirs( path.join("data", "users", current_user.user, "albums", album), exist_ok=True ) filename = file.filename if path.exists( path.join("data", "users", current_user.user, "albums", album, file.filename) ): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] filename = ( ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension ) with open( path.join("data", "users", current_user.user, "albums", album, filename), "wb" ) as f: f.write(await file.read()) # Hashing and duplicates check should be here # Coords extraction should be here uploaded = col_videos.insert_one( { "user": current_user.user, "album": album, "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), "modified": datetime.now(tz=timezone.utc), }, "caption": caption, } ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "album": album, "hash": "", # SHOULD BE DONE "filename": filename, } ) video_get_responses = {404: VideoNotFoundError("id").openapi} @app.get("/videos/{id}", description="Get a video by id", responses=video_get_responses) async def video_get( id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"]), ): try: video = col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) video_path = path.join( "data", "users", current_user.user, "albums", video["album"], video["filename"] ) mime = Magic(mime=True).from_file(video_path) with open(video_path, "rb") as f: video_file = f.read() return Response(video_file, media_type=mime) video_move_responses = {404: VideoNotFoundError("id").openapi} @app.put( "/videos/{id}", description="Move a video into another album", response_model=VideoPublic, responses=video_move_responses, ) async def video_move( id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): try: video = col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) if path.exists( path.join( "data", "users", current_user.user, "albums", album, video["filename"] ) ): base_name = video["filename"].split(".")[:-1] extension = video["filename"].split(".")[-1] filename = ( ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension ) else: filename = video["filename"] col_videos.find_one_and_update( {"_id": ObjectId(id)}, { "$set": { "album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc), } }, ) move( path.join( "data", "users", current_user.user, "albums", video["album"], video["filename"], ), path.join("data", "users", current_user.user, "albums", album, filename), ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], "filename": filename, } ) video_patch_responses = {404: VideoNotFoundError("id").openapi} @app.patch( "/videos/{id}", description="Change properties of a video", response_model=VideoPublic, responses=video_patch_responses, ) async def video_patch( id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): try: video = col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}}, ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], "filename": video["filename"], } ) video_delete_responses = {404: VideoNotFoundError("id").openapi} @app.delete( "/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT, responses=video_delete_responses, ) async def video_delete( id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"]), ): try: video = col_videos.find_one_and_delete({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) album = col_albums.find_one({"name": video["album"]}) remove( path.join( "data", "users", current_user.user, "albums", video["album"], video["filename"], ) ) return Response(status_code=HTTP_204_NO_CONTENT) video_find_responses = { 400: SearchPageInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, 422: VideoSearchQueryEmptyError().openapi, } @app.get( "/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_responses, ) async def video_find( album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"]), ): if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} skip = (page - 1) * page_size if q is None and caption is None: raise VideoSearchQueryEmptyError() if q is None and caption is not None: db_query = { "user": current_user.user, "album": album, "caption": re.compile(caption), } db_query_count = { "user": current_user.user, "album": album, "caption": re.compile(caption), } elif q is not None and caption is None: db_query = list( col_videos.find( {"user": current_user.user, "album": album, "filename": re.compile(q)}, limit=page_size, skip=skip, ).sort("dates.uploaded", DESCENDING) ) db_query_count = { "user": current_user.user, "album": album, "caption": re.compile(q), } else: db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)}, limit=page_size, skip=skip).sort("dates.uploaded", DESCENDING)) # type: ignore db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore videos = list( col_videos.find(db_query, limit=page_size, skip=skip).sort( "dates.uploaded", DESCENDING ) ) for video in videos: output["results"].append( { "id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"], } ) if col_videos.count_documents(db_query_count) > page * page_size: token = str(token_urlsafe(32)) col_tokens.insert_one( { "token": token, "query": q, "album": album, "page": page + 1, "page_size": page_size, "user": pickle.dumps(current_user), } ) output["next_page"] = f"/albums/{album}/videos/token?token={token}" # type: ignore else: output["next_page"] = None # type: ignore return UJSONResponse(output) video_find_token_responses = {401: SearchTokenInvalidError().openapi} @app.get( "/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_token_responses, ) async def video_find_token(token: str): found_record = col_tokens.find_one({"token": token}) if found_record is None: raise SearchTokenInvalidError() return await video_find( q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]), )