PhotosAPI/extensions/videos.py

447 lines
12 KiB
Python

import re
from datetime import datetime, timezone
from os import makedirs, remove
from pathlib import Path
from random import randint
from secrets import token_urlsafe
from shutil import move
from typing import Union
import aiofiles
from bson.errors import InvalidId
from bson.objectid import ObjectId
from fastapi import Security, UploadFile
from fastapi.responses import Response, UJSONResponse
from magic import Magic
from pymongo import DESCENDING
from starlette.status import HTTP_204_NO_CONTENT
from classes.exceptions import (
AlbumNameNotFoundError,
SearchLimitInvalidError,
SearchPageInvalidError,
SearchTokenInvalidError,
VideoNotFoundError,
VideoSearchQueryEmptyError,
)
from classes.models import (
RandomSearchResultsVideo,
SearchResultsVideo,
Video,
VideoPublic,
)
from modules.app import app
from modules.database import col_albums, col_tokens, col_videos
from modules.security import User, get_current_active_user
video_post_responses = {404: AlbumNameNotFoundError("name").openapi}
@app.post(
"/albums/{album}/videos",
description="Upload a video to album",
response_class=UJSONResponse,
response_model=Video,
responses=video_post_responses,
)
async def video_upload(
file: UploadFile,
album: str,
caption: Union[str, None] = None,
current_user: User = Security(get_current_active_user, scopes=["videos.write"]),
):
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
makedirs(Path(f"data/users/{current_user.user}/albums/{album}"), exist_ok=True)
filename = file.filename
if Path(f"data/users/{current_user.user}/albums/{album}/{file.filename}").exists():
base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1]
filename = (
".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension
)
async with aiofiles.open(
Path(f"data/users/{current_user.user}/albums/{album}/{filename}"), "wb"
) as f:
await f.write(await file.read())
# Hashing and duplicates check should be here
# Coords extraction should be here
uploaded = col_videos.insert_one(
{
"user": current_user.user,
"album": album,
"filename": filename,
"dates": {
"uploaded": datetime.now(tz=timezone.utc),
"modified": datetime.now(tz=timezone.utc),
},
"caption": caption,
}
)
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"album": album,
"hash": "", # SHOULD BE DONE
"filename": filename,
}
)
video_get_responses = {
200: {
"content": {
"application/octet-stream": {
"schema": {
"type": "string",
"format": "binary",
"contentMediaType": "video/*",
}
}
}
},
404: VideoNotFoundError("id").openapi,
}
@app.get(
"/videos/{id}",
description="Get a video by id",
responses=video_get_responses,
response_class=Response,
)
async def video_get(
id: str,
current_user: User = Security(get_current_active_user, scopes=["videos.read"]),
):
try:
video = col_videos.find_one({"_id": ObjectId(id)})
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
video_path = Path(
f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}"
)
mime = Magic(mime=True).from_file(video_path)
async with aiofiles.open(video_path, "rb") as f:
video_file = await f.read()
return Response(content=video_file, media_type=mime)
video_move_responses = {404: VideoNotFoundError("id").openapi}
@app.put(
"/videos/{id}",
description="Move a video into another album",
response_model=VideoPublic,
responses=video_move_responses,
)
async def video_move(
id: str,
album: str,
current_user: User = Security(get_current_active_user, scopes=["videos.write"]),
):
try:
video = col_videos.find_one({"_id": ObjectId(id)})
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
if Path(
f"data/users/{current_user.user}/albums/{album}/{video['filename']}"
).exists():
base_name = video["filename"].split(".")[:-1]
extension = video["filename"].split(".")[-1]
filename = (
".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension
)
else:
filename = video["filename"]
col_videos.find_one_and_update(
{"_id": ObjectId(id)},
{
"$set": {
"album": album,
"filename": filename,
"dates.modified": datetime.now(tz=timezone.utc),
}
},
)
move(
Path(
f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}"
),
Path(f"data/users/{current_user.user}/albums/{album}/{filename}"),
)
return UJSONResponse(
{
"id": video["_id"].__str__(),
"caption": video["caption"],
"filename": filename,
}
)
video_patch_responses = {404: VideoNotFoundError("id").openapi}
@app.patch(
"/videos/{id}",
description="Change properties of a video",
response_model=VideoPublic,
responses=video_patch_responses,
)
async def video_patch(
id: str,
caption: str,
current_user: User = Security(get_current_active_user, scopes=["videos.write"]),
):
try:
video = col_videos.find_one({"_id": ObjectId(id)})
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
col_videos.find_one_and_update(
{"_id": ObjectId(id)},
{"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}},
)
return UJSONResponse(
{
"id": video["_id"].__str__(),
"caption": video["caption"],
"filename": video["filename"],
}
)
video_delete_responses = {404: VideoNotFoundError("id").openapi}
@app.delete(
"/videos/{id}",
description="Delete a video by id",
status_code=HTTP_204_NO_CONTENT,
responses=video_delete_responses,
)
async def video_delete(
id: str,
current_user: User = Security(get_current_active_user, scopes=["videos.write"]),
):
try:
video = col_videos.find_one_and_delete({"_id": ObjectId(id)})
if video is None:
raise InvalidId(id)
except InvalidId:
raise VideoNotFoundError(id)
album = col_albums.find_one({"name": video["album"]})
remove(
Path(
f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}"
)
)
return Response(status_code=HTTP_204_NO_CONTENT)
video_random_responses = {
400: SearchLimitInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
}
@app.get(
"/albums/{album}/videos/random",
description="Get one random video, optionally by caption",
response_class=UJSONResponse,
response_model=RandomSearchResultsVideo,
responses=video_random_responses,
)
async def video_random(
album: str,
caption: Union[str, None] = None,
limit: int = 100,
current_user: User = Security(get_current_active_user, scopes=["videos.list"]),
):
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
if limit <= 0:
raise SearchLimitInvalidError()
output = {"results": []}
db_query = (
{
"user": current_user.user,
"album": album,
"caption": re.compile(caption),
}
if caption is not None
else {
"user": current_user.user,
"album": album,
}
)
documents_count = col_videos.count_documents(db_query)
skip = randint(0, documents_count - 1) if documents_count > 1 else 0
videos = list(
col_videos.aggregate(
[
{"$match": db_query},
{"$skip": skip},
{"$limit": limit},
]
)
)
for video in videos:
output["results"].append(
{
"id": video["_id"].__str__(),
"filename": video["filename"],
"caption": video["caption"],
}
)
return UJSONResponse(output)
video_find_responses = {
400: SearchPageInvalidError().openapi,
401: SearchTokenInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
422: VideoSearchQueryEmptyError().openapi,
}
@app.get(
"/albums/{album}/videos",
description="Find a video by filename, caption or token",
response_class=UJSONResponse,
response_model=SearchResultsVideo,
responses=video_find_responses,
)
async def video_find(
album: str,
q: Union[str, None] = None,
caption: Union[str, None] = None,
token: Union[str, None] = None,
page: int = 1,
page_size: int = 100,
current_user: User = Security(get_current_active_user, scopes=["videos.list"]),
):
if token is not None:
found_record = col_tokens.find_one({"token": token})
if found_record is None:
raise SearchTokenInvalidError()
return await video_find(
album=album,
q=found_record["query"],
caption=found_record["caption"],
page=found_record["page"],
page_size=found_record["page_size"],
current_user=current_user,
)
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
if page <= 0 or page_size <= 0:
raise SearchPageInvalidError()
output = {"results": []}
skip = (page - 1) * page_size
if q is None and caption is None:
raise VideoSearchQueryEmptyError()
if q is None:
db_query = {
"user": current_user.user,
"album": album,
"caption": re.compile(caption),
}
db_query_count = {
"user": current_user.user,
"album": album,
"caption": re.compile(caption),
}
elif caption is None:
db_query = list(
col_videos.find(
{"user": current_user.user, "album": album, "filename": re.compile(q)},
limit=page_size,
skip=skip,
).sort("dates.uploaded", DESCENDING)
)
db_query_count = {
"user": current_user.user,
"album": album,
"caption": re.compile(q),
}
else:
db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)}, limit=page_size, skip=skip).sort("dates.uploaded", DESCENDING)) # type: ignore
db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore
videos = list(
col_videos.find(db_query, limit=page_size, skip=skip).sort(
"dates.uploaded", DESCENDING
)
)
for video in videos:
output["results"].append(
{
"id": video["_id"].__str__(),
"filename": video["filename"],
"caption": video["caption"],
}
)
if col_videos.count_documents(db_query_count) > page * page_size:
token = str(token_urlsafe(32))
col_tokens.insert_one(
{
"token": token,
"query": q,
"caption": caption,
"page": page + 1,
"page_size": page_size,
}
)
output["next_page"] = f"/albums/{album}/videos/?token={token}" # type: ignore
else:
output["next_page"] = None # type: ignore
return UJSONResponse(output)