PhotosAPI/extensions/videos.py

243 lines
10 KiB
Python

import re
import pickle
from secrets import token_urlsafe
from shutil import move
from typing import Union
from magic import Magic
from datetime import datetime, timezone
from os import makedirs, path, remove
from classes.exceptions import AlbumNameNotFoundError, SearchPageInvalidError, SearchTokenInvalidError, VideoNotFoundError, VideoSearchQueryEmptyError
from classes.models import Video, SearchResultsVideo, VideoPublic
#from modules.unified_exif_reader import extract_location
from modules.security import User, get_current_active_user
from modules.app import app
from modules.database import col_videos, col_albums, col_tokens
from bson.objectid import ObjectId
from bson.errors import InvalidId
from pymongo import DESCENDING
from fastapi import UploadFile, Security
from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT
video_post_responses = {
404: AlbumNameNotFoundError("name").openapi
}
@app.post("/albums/{album}/videos", description="Upload a video to album", response_class=UJSONResponse, response_model=Video, responses=video_post_responses)
async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise AlbumNameNotFoundError(album)
# if not file.content_type.startswith("video"):
# raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not a video, not accepting.")
makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True)
filename = file.filename
if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)):
base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f:
f.write(await file.read())
# file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename))
# duplicates = await get_duplicates(file_hash, album)
# if len(duplicates) > 0 and ignore_duplicates is False:
# return UJSONResponse(
# {
# "detail": "video duplicates found. Pass 'ignore_duplicates=true' to ignore.",
# "duplicates": duplicates
# },
# status_code=HTTP_409_CONFLICT
# )
#coords = extract_location(path.join("data", "users", current_user.user, "albums", album, filename))
uploaded = col_videos.insert_one(
{
"user": current_user.user,
"album": album,
"filename": filename,
"dates": {
"uploaded": datetime.now(tz=timezone.utc),
"modified": datetime.now(tz=timezone.utc)
},
"caption": caption,
# "location": [
# coords["lng"],
# coords["lat"],
# coords["alt"]
# ]
}
)
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"album": album,
"hash": "", # SHOULD BE DONE
"filename": filename
}
)
video_get_responses = {
404: VideoNotFoundError("id").openapi
}
@app.get("/videos/{id}", description="Get a video by id", responses=video_get_responses)
async def video_get(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"])):
try:
video = col_videos.find_one( {"_id": ObjectId(id)} )
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
video_path = path.join("data", "users", current_user.user, "albums", video["album"], video["filename"])
mime = Magic(mime=True).from_file(video_path)
with open(video_path, "rb") as f: video_file = f.read()
return Response(video_file, media_type=mime)
video_move_responses = {
404: VideoNotFoundError("id").openapi
}
@app.put("/videos/{id}", description="Move a video into another album", response_model=VideoPublic, responses=video_move_responses)
async def video_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try:
video = col_videos.find_one( {"_id": ObjectId(id)} )
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise AlbumNameNotFoundError(album)
if path.exists(path.join("data", "users", current_user.user, "albums", album, video["filename"])):
base_name = video["filename"].split(".")[:-1]
extension = video["filename"].split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
else:
filename = video["filename"]
col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} )
move(
path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]),
path.join("data", "users", current_user.user, "albums", album, filename)
)
return UJSONResponse(
{
"id": video["_id"].__str__(),
"caption": video["caption"],
"filename": filename
}
)
video_patch_responses = {
404: VideoNotFoundError("id").openapi
}
@app.patch("/videos/{id}", description="Change properties of a video", response_model=VideoPublic, responses=video_patch_responses)
async def video_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try:
video = col_videos.find_one( {"_id": ObjectId(id)} )
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} )
return UJSONResponse(
{
"id": video["_id"].__str__(),
"caption": video["caption"],
"filename": video["filename"]
}
)
video_delete_responses = {
404: VideoNotFoundError("id").openapi
}
@app.delete("/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT, responses=video_delete_responses)
async def video_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])):
try:
video = col_videos.find_one_and_delete( {"_id": ObjectId(id)} )
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
album = col_albums.find_one( {"name": video["album"]} )
remove(path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]))
return Response(status_code=HTTP_204_NO_CONTENT)
video_find_responses = {
400: SearchPageInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
422: VideoSearchQueryEmptyError().openapi
}
@app.get("/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_responses)
async def video_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise AlbumNameNotFoundError(album)
if page <= 0 or page_size <= 0:
raise SearchPageInvalidError()
output = {"results": []}
skip = (page-1)*page_size
if q is None and caption is None:
raise VideoSearchQueryEmptyError()
if q is None and caption is not None:
db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
elif q is not None and caption is None:
db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q)}, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING))
db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(q)}
else:
db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)}, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) # type: ignore
db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore
videos = list(col_videos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING))
for video in videos:
output["results"].append({"id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"]})
if col_videos.count_documents( db_query_count ) > page*page_size:
token = str(token_urlsafe(32))
col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} )
output["next_page"] = f"/albums/{album}/videos/token?token={token}" # type: ignore
else:
output["next_page"] = None # type: ignore
return UJSONResponse(output)
video_find_token_responses = {
401: SearchTokenInvalidError().openapi
}
@app.get("/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_token_responses)
async def video_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} )
if found_record is None:
raise SearchTokenInvalidError()
return await video_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))