import re import pickle from secrets import token_urlsafe from shutil import move from typing import Union from magic import Magic from datetime import datetime, timezone from os import makedirs, path, remove from classes.exceptions import AlbumNameNotFoundError, SearchPageInvalidError, SearchTokenInvalidError, VideoNotFoundError, VideoSearchQueryEmptyError from classes.models import Video, SearchResultsVideo, VideoPublic from modules.security import User, get_current_active_user from modules.app import app from modules.database import col_videos, col_albums, col_tokens from bson.objectid import ObjectId from bson.errors import InvalidId from pymongo import DESCENDING from fastapi import UploadFile, Security from fastapi.responses import UJSONResponse, Response from starlette.status import HTTP_204_NO_CONTENT video_post_responses = { 404: AlbumNameNotFoundError("name").openapi } @app.post("/albums/{album}/videos", description="Upload a video to album", response_class=UJSONResponse, response_model=Video, responses=video_post_responses) async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: raise AlbumNameNotFoundError(album) makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True) filename = file.filename if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f: f.write(await file.read()) # Hashing and duplicates check should be here # Coords extraction should be here uploaded = col_videos.insert_one( { "user": current_user.user, "album": album, "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), "modified": datetime.now(tz=timezone.utc) }, "caption": caption } ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "album": album, "hash": "", # SHOULD BE DONE "filename": filename } ) video_get_responses = { 404: VideoNotFoundError("id").openapi } @app.get("/videos/{id}", description="Get a video by id", responses=video_get_responses) async def video_get(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"])): try: video = col_videos.find_one( {"_id": ObjectId(id)} ) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) video_path = path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]) mime = Magic(mime=True).from_file(video_path) with open(video_path, "rb") as f: video_file = f.read() return Response(video_file, media_type=mime) video_move_responses = { 404: VideoNotFoundError("id").openapi } @app.put("/videos/{id}", description="Move a video into another album", response_model=VideoPublic, responses=video_move_responses) async def video_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): try: video = col_videos.find_one( {"_id": ObjectId(id)} ) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: raise AlbumNameNotFoundError(album) if path.exists(path.join("data", "users", current_user.user, "albums", album, video["filename"])): base_name = video["filename"].split(".")[:-1] extension = video["filename"].split(".")[-1] filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension else: filename = video["filename"] col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} ) move( path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]), path.join("data", "users", current_user.user, "albums", album, filename) ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], "filename": filename } ) video_patch_responses = { 404: VideoNotFoundError("id").openapi } @app.patch("/videos/{id}", description="Change properties of a video", response_model=VideoPublic, responses=video_patch_responses) async def video_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): try: video = col_videos.find_one( {"_id": ObjectId(id)} ) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], "filename": video["filename"] } ) video_delete_responses = { 404: VideoNotFoundError("id").openapi } @app.delete("/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT, responses=video_delete_responses) async def video_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): try: video = col_videos.find_one_and_delete( {"_id": ObjectId(id)} ) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) album = col_albums.find_one( {"name": video["album"]} ) remove(path.join("data", "users", current_user.user, "albums", video["album"], video["filename"])) return Response(status_code=HTTP_204_NO_CONTENT) video_find_responses = { 400: SearchPageInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, 422: VideoSearchQueryEmptyError().openapi } @app.get("/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_responses) async def video_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"])): if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: raise AlbumNameNotFoundError(album) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} skip = (page-1)*page_size if q is None and caption is None: raise VideoSearchQueryEmptyError() if q is None and caption is not None: db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)} db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)} elif q is not None and caption is None: db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q)}, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(q)} else: db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)}, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) # type: ignore db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore videos = list(col_videos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) for video in videos: output["results"].append({"id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"]}) if col_videos.count_documents( db_query_count ) > page*page_size: token = str(token_urlsafe(32)) col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} ) output["next_page"] = f"/albums/{album}/videos/token?token={token}" # type: ignore else: output["next_page"] = None # type: ignore return UJSONResponse(output) video_find_token_responses = { 401: SearchTokenInvalidError().openapi } @app.get("/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_token_responses) async def video_find_token(token: str): found_record = col_tokens.find_one( {"token": token} ) if found_record is None: raise SearchTokenInvalidError() return await video_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))