Random media requests #18

Merged
profitroll merged 5 commits from dev into master 2023-06-27 14:54:29 +03:00
4 changed files with 174 additions and 2 deletions
Showing only changes of commit 5e3df74052 - Show all commits

View File

@ -164,6 +164,26 @@ class VideoSearchQueryEmptyError(HTTPException):
) )
class SearchLimitInvalidError(HTTPException):
"""Raises HTTP 400 if search results limit not in valid range."""
def __init__(self):
self.openapi = {
"description": "Invalid Limit",
"content": {
"application/json": {
"example": {
"detail": "Parameter 'limit' must be greater or equal to 1."
}
}
},
}
super().__init__(
status_code=400,
detail=self.openapi["content"]["application/json"]["example"]["detail"],
)
class SearchPageInvalidError(HTTPException): class SearchPageInvalidError(HTTPException):
"""Raises HTTP 400 if page or page size are not in valid range.""" """Raises HTTP 400 if page or page size are not in valid range."""

View File

@ -72,3 +72,11 @@ class SearchResultsPhoto(BaseModel):
class SearchResultsVideo(BaseModel): class SearchResultsVideo(BaseModel):
results: List[VideoSearch] results: List[VideoSearch]
next_page: Union[str, None] next_page: Union[str, None]
class RandomSearchResultsPhoto(BaseModel):
results: List[PhotoSearch]
class RandomSearchResultsVideo(BaseModel):
results: List[VideoSearch]

View File

@ -3,6 +3,7 @@ import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system from os import makedirs, path, remove, system
from pathlib import Path from pathlib import Path
from random import randint
from secrets import token_urlsafe from secrets import token_urlsafe
from shutil import move from shutil import move
from threading import Thread from threading import Thread
@ -26,10 +27,16 @@ from classes.exceptions import (
AlbumNameNotFoundError, AlbumNameNotFoundError,
PhotoNotFoundError, PhotoNotFoundError,
PhotoSearchQueryEmptyError, PhotoSearchQueryEmptyError,
SearchLimitInvalidError,
SearchPageInvalidError, SearchPageInvalidError,
SearchTokenInvalidError, SearchTokenInvalidError,
) )
from classes.models import Photo, PhotoPublic, SearchResultsPhoto from classes.models import (
Photo,
PhotoPublic,
RandomSearchResultsPhoto,
SearchResultsPhoto,
)
from modules.app import app from modules.app import app
from modules.database import col_albums, col_photos, col_tokens from modules.database import col_albums, col_photos, col_tokens
from modules.exif_reader import extract_location from modules.exif_reader import extract_location
@ -445,6 +452,71 @@ async def photo_delete(
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
photo_random_responses = {
400: SearchLimitInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
}
@app.get(
"/albums/{album}/photos/random",
description="Get one random photo, optionally by caption",
response_class=UJSONResponse,
response_model=RandomSearchResultsPhoto,
responses=photo_random_responses,
)
async def photo_random(
album: str,
caption: Union[str, None] = None,
limit: int = 100,
current_user: User = Security(get_current_active_user, scopes=["photos.list"]),
):
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
if limit <= 0:
raise SearchLimitInvalidError()
output = {"results": []}
db_query = (
{
"user": current_user.user,
"album": album,
"caption": re.compile(caption),
}
if caption is not None
else {
"user": current_user.user,
"album": album,
}
)
documents_count = col_photos.count_documents(db_query)
skip = randint(0, documents_count - 1) if documents_count > 1 else 0
images = list(
col_photos.aggregate(
[
{"$match": db_query},
{"$skip": skip},
{"$limit": limit},
]
)
)
for image in images:
output["results"].append(
{
"id": image["_id"].__str__(),
"filename": image["filename"],
"caption": image["caption"],
}
)
return UJSONResponse(output)
photo_find_responses = { photo_find_responses = {
400: SearchPageInvalidError().openapi, 400: SearchPageInvalidError().openapi,
401: SearchTokenInvalidError().openapi, 401: SearchTokenInvalidError().openapi,

View File

@ -2,6 +2,7 @@ import re
from datetime import datetime, timezone from datetime import datetime, timezone
from os import makedirs, remove from os import makedirs, remove
from pathlib import Path from pathlib import Path
from random import randint
from secrets import token_urlsafe from secrets import token_urlsafe
from shutil import move from shutil import move
from typing import Union from typing import Union
@ -17,12 +18,18 @@ from starlette.status import HTTP_204_NO_CONTENT
from classes.exceptions import ( from classes.exceptions import (
AlbumNameNotFoundError, AlbumNameNotFoundError,
SearchLimitInvalidError,
SearchPageInvalidError, SearchPageInvalidError,
SearchTokenInvalidError, SearchTokenInvalidError,
VideoNotFoundError, VideoNotFoundError,
VideoSearchQueryEmptyError, VideoSearchQueryEmptyError,
) )
from classes.models import SearchResultsVideo, Video, VideoPublic from classes.models import (
RandomSearchResultsVideo,
SearchResultsVideo,
Video,
VideoPublic,
)
from modules.app import app from modules.app import app
from modules.database import col_albums, col_tokens, col_videos from modules.database import col_albums, col_tokens, col_videos
from modules.security import User, get_current_active_user from modules.security import User, get_current_active_user
@ -262,6 +269,71 @@ async def video_delete(
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
video_random_responses = {
400: SearchLimitInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
}
@app.get(
"/albums/{album}/videos/random",
description="Get one random video, optionally by caption",
response_class=UJSONResponse,
response_model=RandomSearchResultsVideo,
responses=video_random_responses,
)
async def video_random(
album: str,
caption: Union[str, None] = None,
limit: int = 100,
current_user: User = Security(get_current_active_user, scopes=["videos.list"]),
):
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
if limit <= 0:
raise SearchLimitInvalidError()
output = {"results": []}
db_query = (
{
"user": current_user.user,
"album": album,
"caption": re.compile(caption),
}
if caption is not None
else {
"user": current_user.user,
"album": album,
}
)
documents_count = col_videos.count_documents(db_query)
skip = randint(0, documents_count - 1) if documents_count > 1 else 0
videos = list(
col_videos.aggregate(
[
{"$match": db_query},
{"$skip": skip},
{"$limit": limit},
]
)
)
for video in videos:
output["results"].append(
{
"id": video["_id"].__str__(),
"filename": video["filename"],
"caption": video["caption"],
}
)
return UJSONResponse(output)
video_find_responses = { video_find_responses = {
400: SearchPageInvalidError().openapi, 400: SearchPageInvalidError().openapi,
401: SearchTokenInvalidError().openapi, 401: SearchTokenInvalidError().openapi,