diff --git a/classes/exceptions.py b/classes/exceptions.py index cea98d6..1bec769 100644 --- a/classes/exceptions.py +++ b/classes/exceptions.py @@ -3,51 +3,52 @@ from typing import Literal class AlbumNotFoundError(Exception): """Raises HTTP 404 if no album with this ID found.""" + def __init__(self, id: str): self.id = id self.openapi = { "description": "Album Does Not Exist", "content": { "application/json": { - "example": { - "detail": "Could not find album with id '{id}'." - } + "example": {"detail": "Could not find album with id '{id}'."} } - } + }, } + class AlbumNameNotFoundError(Exception): """Raises HTTP 404 if no album with this name found.""" + def __init__(self, name: str): self.name = name self.openapi = { "description": "Album Does Not Exist", "content": { "application/json": { - "example": { - "detail": "Could not find album with name '{name}'." - } + "example": {"detail": "Could not find album with name '{name}'."} } - } + }, } + class AlbumAlreadyExistsError(Exception): """Raises HTTP 409 if album with this name already exists.""" + def __init__(self, name: str): self.name = name self.openapi = { "description": "Album Already Exists", "content": { "application/json": { - "example": { - "detail": "Album with name '{name}' already exists." - } + "example": {"detail": "Album with name '{name}' already exists."} } - } + }, } + class AlbumIncorrectError(Exception): """Raises HTTP 406 if album's title or name is invalid.""" + def __init__(self, place: Literal["name", "title"], error: str) -> None: self.place = place self.error = error @@ -55,30 +56,30 @@ class AlbumIncorrectError(Exception): "description": "Album Name/Title Invalid", "content": { "application/json": { - "example": { - "detail": "Album {name/title} invalid: {error}" - } + "example": {"detail": "Album {name/title} invalid: {error}"} } - } + }, } + class PhotoNotFoundError(Exception): """Raises HTTP 404 if no photo with this ID found.""" + def __init__(self, id: str): self.id = id self.openapi = { "description": "Photo Does Not Exist", "content": { "application/json": { - "example": { - "detail": "Could not find photo with id '{id}'." - } + "example": {"detail": "Could not find photo with id '{id}'."} } - } + }, } + class PhotoSearchQueryEmptyError(Exception): """Raises HTTP 422 if no photo search query provided.""" + def __init__(self): self.openapi = { "description": "Invalid Query", @@ -88,26 +89,28 @@ class PhotoSearchQueryEmptyError(Exception): "detail": "You must provide query, caption or coordinates to look for photos." } } - } + }, } + class VideoNotFoundError(Exception): """Raises HTTP 404 if no video with this ID found.""" + def __init__(self, id: str): self.id = id self.openapi = { "description": "Video Does Not Exist", "content": { "application/json": { - "example": { - "detail": "Could not find video with id '{id}'." - } + "example": {"detail": "Could not find video with id '{id}'."} } - } + }, } + class VideoSearchQueryEmptyError(Exception): """Raises HTTP 422 if no video search query provided.""" + def __init__(self): self.openapi = { "description": "Invalid Query", @@ -117,11 +120,13 @@ class VideoSearchQueryEmptyError(Exception): "detail": "You must provide query or caption to look for videos." } } - } + }, } + class SearchPageInvalidError(Exception): """Raises HTTP 400 if page or page size are not in valid range.""" + def __init__(self): self.openapi = { "description": "Invalid Page", @@ -131,75 +136,69 @@ class SearchPageInvalidError(Exception): "detail": "Parameters 'page' and 'page_size' must be greater or equal to 1." } } - } + }, } + class SearchTokenInvalidError(Exception): """Raises HTTP 401 if search token is not valid.""" + def __init__(self): self.openapi = { "description": "Invalid Token", "content": { - "application/json": { - "example": { - "detail": "Invalid search token." - } - } - } + "application/json": {"example": {"detail": "Invalid search token."}} + }, } + class UserEmailCodeInvalid(Exception): """Raises HTTP 400 if email confirmation code is not valid.""" + def __init__(self): self.openapi = { "description": "Invalid Email Code", "content": { "application/json": { - "example": { - "detail": "Confirmation code is invalid." - } + "example": {"detail": "Confirmation code is invalid."} } - } + }, } + class UserAlreadyExists(Exception): """Raises HTTP 409 if user with this name already exists.""" + def __init__(self): self.openapi = { "description": "User Already Exists", "content": { "application/json": { - "example": { - "detail": "User with this username already exists." - } + "example": {"detail": "User with this username already exists."} } - } + }, } + class AccessTokenInvalidError(Exception): """Raises HTTP 401 if access token is not valid.""" + def __init__(self): self.openapi = { "description": "Invalid Access Token", "content": { - "application/json": { - "example": { - "detail": "Invalid access token." - } - } - } + "application/json": {"example": {"detail": "Invalid access token."}} + }, } + class UserCredentialsInvalid(Exception): """Raises HTTP 401 if user credentials are not valid.""" + def __init__(self): self.openapi = { "description": "Invalid Credentials", "content": { - "application/json": { - "example": { - "detail": "Invalid credentials." - } - } - } - } \ No newline at end of file + "application/json": {"example": {"detail": "Invalid credentials."}} + }, + } diff --git a/classes/models.py b/classes/models.py index 6b3d521..5e072b8 100644 --- a/classes/models.py +++ b/classes/models.py @@ -8,11 +8,13 @@ class Photo(BaseModel): hash: str filename: str + class PhotoPublic(BaseModel): id: str caption: str filename: str + class PhotoSearch(BaseModel): id: str filename: str @@ -25,11 +27,13 @@ class Video(BaseModel): hash: str filename: str + class VideoPublic(BaseModel): id: str caption: str filename: str + class VideoSearch(BaseModel): id: str filename: str @@ -41,11 +45,13 @@ class Album(BaseModel): name: str title: str + class AlbumSearch(BaseModel): id: str name: str title: str + class AlbumModified(BaseModel): name: str title: str @@ -56,10 +62,12 @@ class SearchResultsAlbum(BaseModel): results: List[Album] next_page: Union[str, None] + class SearchResultsPhoto(BaseModel): results: List[PhotoSearch] next_page: Union[str, None] + class SearchResultsVideo(BaseModel): results: List[VideoSearch] - next_page: Union[str, None] \ No newline at end of file + next_page: Union[str, None] diff --git a/extensions/albums.py b/extensions/albums.py index 7cb4663..1eb130a 100644 --- a/extensions/albums.py +++ b/extensions/albums.py @@ -2,69 +2,108 @@ import re from os import makedirs, path, rename from shutil import rmtree from typing import Union -from classes.exceptions import AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError + +from bson.errors import InvalidId +from bson.objectid import ObjectId +from fastapi import Security +from fastapi.responses import Response, UJSONResponse +from starlette.status import HTTP_204_NO_CONTENT + +from classes.exceptions import ( + AlbumAlreadyExistsError, + AlbumIncorrectError, + AlbumNotFoundError, +) from classes.models import Album, AlbumModified, SearchResultsAlbum from modules.app import app -from modules.database import col_photos, col_albums +from modules.database import col_albums, col_photos from modules.security import User, get_current_active_user -from bson.objectid import ObjectId -from bson.errors import InvalidId - -from fastapi import Security -from fastapi.responses import UJSONResponse, Response -from starlette.status import HTTP_204_NO_CONTENT album_create_responses = { 406: AlbumIncorrectError("name", "error").openapi, - 409: AlbumAlreadyExistsError("name").openapi + 409: AlbumAlreadyExistsError("name").openapi, } -@app.post("/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses) -async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): - if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: + +@app.post( + "/albums", + description="Create album with name and title", + response_class=UJSONResponse, + response_model=Album, + responses=album_create_responses, +) +async def album_create( + name: str, + title: str, + current_user: User = Security(get_current_active_user, scopes=["albums.write"]), +): + if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") - + if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") - if col_albums.find_one( {"name": name} ) is not None: + if col_albums.find_one({"name": name}) is not None: raise AlbumAlreadyExistsError(name) - makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True) - - uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} ) - - return UJSONResponse( - { - "id": uploaded.inserted_id.__str__(), - "name": name, - "title": title - } + makedirs( + path.join("data", "users", current_user.user, "albums", name), exist_ok=True ) -@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum) -async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])): + uploaded = col_albums.insert_one( + {"user": current_user.user, "name": name, "title": title, "cover": None} + ) + return UJSONResponse( + {"id": uploaded.inserted_id.__str__(), "name": name, "title": title} + ) + + +@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum) +async def album_find( + q: str, + current_user: User = Security(get_current_active_user, scopes=["albums.list"]), +): output = {"results": []} - albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} )) + albums = list(col_albums.find({"user": current_user.user, "name": re.compile(q)})) for album in albums: - output["results"].append( {"id": album["_id"].__str__(), "name": album["name"], "title": album["title"]} ) + output["results"].append( + { + "id": album["_id"].__str__(), + "name": album["name"], + "title": album["title"], + } + ) return UJSONResponse(output) + album_patch_responses = { 404: AlbumNotFoundError("id").openapi, - 406: AlbumIncorrectError("name", "error").openapi + 406: AlbumIncorrectError("name", "error").openapi, } -@app.patch("/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses) -async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): + +@app.patch( + "/albums/{id}", + description="Modify album's name or title by id", + response_class=UJSONResponse, + response_model=AlbumModified, + responses=album_patch_responses, +) +async def album_patch( + id: str, + name: Union[str, None] = None, + title: Union[str, None] = None, + cover: Union[str, None] = None, + current_user: User = Security(get_current_active_user, scopes=["albums.write"]), +): try: - album = col_albums.find_one( {"_id": ObjectId(id)} ) + album = col_albums.find_one({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId: @@ -77,49 +116,64 @@ async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, title = album["title"] if name is not None: - if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: - raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") + if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: + raise AlbumIncorrectError( + "name", "can only contain a-z, 0-9 and _ characters." + ) if 2 > len(name) > 20: raise AlbumIncorrectError("name", "must be >2 and <20 characters.") rename( path.join("data", "users", current_user.user, "albums", album["name"]), - path.join("data", "users", current_user.user, "albums", name) + path.join("data", "users", current_user.user, "albums", name), + ) + col_photos.update_many( + {"user": current_user.user, "album": album["name"]}, + {"$set": {"album": name}}, ) - col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) else: name = album["name"] if cover is not None: - image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) + image = col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]}) cover = image["_id"].__str__() if image is not None else album["cover"] else: cover = album["cover"] - col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) - - return UJSONResponse( - { - "name": name, - "title": title, - "cover": cover - } + col_albums.update_one( + {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) + return UJSONResponse({"name": name, "title": title, "cover": cover}) + + album_put_responses = { 404: AlbumNotFoundError("id").openapi, - 406: AlbumIncorrectError("name", "error").openapi + 406: AlbumIncorrectError("name", "error").openapi, } -@app.put("/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses) -async def album_put(id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): + +@app.put( + "/albums/{id}", + description="Modify album's name and title by id", + response_class=UJSONResponse, + response_model=AlbumModified, + responses=album_put_responses, +) +async def album_put( + id: str, + name: str, + title: str, + cover: str, + current_user: User = Security(get_current_active_user, scopes=["albums.write"]), +): try: - album = col_albums.find_one( {"_id": ObjectId(id)} ) + album = col_albums.find_one({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) - if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False: + if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.") if 2 > len(name) > 20: @@ -128,40 +182,46 @@ async def album_put(id: str, name: str, title: str, cover: str, current_user: Us if 2 > len(title) > 40: raise AlbumIncorrectError("title", "must be >2 and <40 characters.") - image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} ) - cover = image["_id"].__str__() if image is not None else None # type: ignore - + image = col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]}) + cover = image["_id"].__str__() if image is not None else None # type: ignore + rename( path.join("data", "users", current_user.user, "albums", album["name"]), - path.join("data", "users", current_user.user, "albums", name) + path.join("data", "users", current_user.user, "albums", name), ) - col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} ) - col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) - - return UJSONResponse( - { - "name": name, - "title": title, - "cover": cover - } + col_photos.update_many( + {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} + ) + col_albums.update_one( + {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} ) -album_delete_responses = { - 404: AlbumNotFoundError("id").openapi -} -@app.delete("/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses) -async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])): + return UJSONResponse({"name": name, "title": title, "cover": cover}) + +album_delete_responses = {404: AlbumNotFoundError("id").openapi} + + +@app.delete( + "/album/{id}", + description="Delete album by id", + status_code=HTTP_204_NO_CONTENT, + responses=album_delete_responses, +) +async def album_delete( + id: str, + current_user: User = Security(get_current_active_user, scopes=["albums.write"]), +): try: - album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} ) + album = col_albums.find_one_and_delete({"_id": ObjectId(id)}) if album is None: raise InvalidId(id) except InvalidId: raise AlbumNotFoundError(id) - - col_photos.delete_many( {"album": album["name"]} ) + + col_photos.delete_many({"album": album["name"]}) rmtree(path.join("data", "users", current_user.user, "albums", album["name"])) - return Response(status_code=HTTP_204_NO_CONTENT) \ No newline at end of file + return Response(status_code=HTTP_204_NO_CONTENT) diff --git a/extensions/exceptions.py b/extensions/exceptions.py index df8caae..2bfda00 100644 --- a/extensions/exceptions.py +++ b/extensions/exceptions.py @@ -1,8 +1,16 @@ -from fastapi import Request +from fastapi import Request from fastapi.responses import UJSONResponse from modules.app import app from classes.exceptions import * -from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY +from starlette.status import ( + HTTP_400_BAD_REQUEST, + HTTP_401_UNAUTHORIZED, + HTTP_404_NOT_FOUND, + HTTP_406_NOT_ACCEPTABLE, + HTTP_409_CONFLICT, + HTTP_422_UNPROCESSABLE_ENTITY, +) + @app.exception_handler(AlbumNotFoundError) async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError): @@ -11,13 +19,17 @@ async def album_not_found_exception_handler(request: Request, exc: AlbumNotFound content={"detail": f"Could not find album with id '{exc.id}'."}, ) + @app.exception_handler(AlbumAlreadyExistsError) -async def album_already_exists_exception_handler(request: Request, exc: AlbumAlreadyExistsError): +async def album_already_exists_exception_handler( + request: Request, exc: AlbumAlreadyExistsError +): return UJSONResponse( status_code=HTTP_409_CONFLICT, content={"detail": f"Album with name '{exc.name}' already exists."}, ) + @app.exception_handler(AlbumIncorrectError) async def album_incorrect_exception_handler(request: Request, exc: AlbumIncorrectError): return UJSONResponse( @@ -25,6 +37,7 @@ async def album_incorrect_exception_handler(request: Request, exc: AlbumIncorrec content={"detail": f"Album {exc.place} invalid: {exc.error}"}, ) + @app.exception_handler(PhotoNotFoundError) async def photo_not_found_exception_handler(request: Request, exc: PhotoNotFoundError): return UJSONResponse( @@ -32,13 +45,19 @@ async def photo_not_found_exception_handler(request: Request, exc: PhotoNotFound content={"detail": f"Could not find photo with id '{exc.id}'."}, ) + @app.exception_handler(PhotoSearchQueryEmptyError) -async def photo_search_query_empty_exception_handler(request: Request, exc: PhotoSearchQueryEmptyError): +async def photo_search_query_empty_exception_handler( + request: Request, exc: PhotoSearchQueryEmptyError +): return UJSONResponse( status_code=HTTP_422_UNPROCESSABLE_ENTITY, - content={"detail": "You must provide query, caption or coordinates to look for photos."}, + content={ + "detail": "You must provide query, caption or coordinates to look for photos." + }, ) + @app.exception_handler(VideoNotFoundError) async def video_not_found_exception_handler(request: Request, exc: VideoNotFoundError): return UJSONResponse( @@ -46,51 +65,78 @@ async def video_not_found_exception_handler(request: Request, exc: VideoNotFound content={"detail": f"Could not find video with id '{exc.id}'."}, ) + @app.exception_handler(VideoSearchQueryEmptyError) -async def video_search_query_empty_exception_handler(request: Request, exc: VideoSearchQueryEmptyError): +async def video_search_query_empty_exception_handler( + request: Request, exc: VideoSearchQueryEmptyError +): return UJSONResponse( status_code=HTTP_422_UNPROCESSABLE_ENTITY, - content={"detail": "You must provide query, caption or coordinates to look for photos."}, + content={ + "detail": "You must provide query, caption or coordinates to look for photos." + }, ) + @app.exception_handler(SearchPageInvalidError) -async def search_page_invalid_exception_handler(request: Request, exc: SearchPageInvalidError): +async def search_page_invalid_exception_handler( + request: Request, exc: SearchPageInvalidError +): return UJSONResponse( status_code=HTTP_400_BAD_REQUEST, - content={"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."}, + content={ + "detail": "Parameters 'page' and 'page_size' must be greater or equal to 1." + }, ) + @app.exception_handler(SearchTokenInvalidError) -async def search_token_invalid_exception_handler(request: Request, exc: SearchTokenInvalidError): +async def search_token_invalid_exception_handler( + request: Request, exc: SearchTokenInvalidError +): return UJSONResponse( status_code=HTTP_401_UNAUTHORIZED, - content={"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."}, + content={ + "detail": "Parameters 'page' and 'page_size' must be greater or equal to 1." + }, ) + @app.exception_handler(UserEmailCodeInvalid) -async def user_email_code_invalid_exception_handler(request: Request, exc: UserEmailCodeInvalid): +async def user_email_code_invalid_exception_handler( + request: Request, exc: UserEmailCodeInvalid +): return UJSONResponse( status_code=HTTP_400_BAD_REQUEST, content={"detail": "Confirmation code is invalid."}, ) + @app.exception_handler(UserAlreadyExists) -async def user_already_exists_exception_handler(request: Request, exc: UserAlreadyExists): +async def user_already_exists_exception_handler( + request: Request, exc: UserAlreadyExists +): return UJSONResponse( status_code=HTTP_409_CONFLICT, content={"detail": "User with this username already exists."}, ) + @app.exception_handler(AccessTokenInvalidError) -async def access_token_invalid_exception_handler(request: Request, exc: AccessTokenInvalidError): +async def access_token_invalid_exception_handler( + request: Request, exc: AccessTokenInvalidError +): return UJSONResponse( status_code=HTTP_401_UNAUTHORIZED, content={"detail": "Invalid access token."}, ) + @app.exception_handler(UserCredentialsInvalid) -async def user_credentials_invalid_exception_handler(request: Request, exc: UserCredentialsInvalid): +async def user_credentials_invalid_exception_handler( + request: Request, exc: UserCredentialsInvalid +): return UJSONResponse( status_code=HTTP_401_UNAUTHORIZED, content={"detail": "Invalid credentials."}, - ) \ No newline at end of file + ) diff --git a/extensions/pages.py b/extensions/pages.py index 3b7a120..b712a84 100644 --- a/extensions/pages.py +++ b/extensions/pages.py @@ -2,26 +2,30 @@ from os import path from modules.app import app from fastapi.responses import HTMLResponse, Response + @app.get("/pages/matter.css", include_in_schema=False) async def page_matter(): with open(path.join("pages", "matter.css"), "r", encoding="utf-8") as f: output = f.read() return Response(content=output) + @app.get("/pages/{page}/{file}", include_in_schema=False) -async def page_assets(page:str, file: str): +async def page_assets(page: str, file: str): with open(path.join("pages", page, file), "r", encoding="utf-8") as f: output = f.read() return Response(content=output) + @app.get("/", include_in_schema=False) async def page_home(): with open(path.join("pages", "home", "index.html"), "r", encoding="utf-8") as f: output = f.read() return HTMLResponse(content=output) + @app.get("/register", include_in_schema=False) async def page_register(): with open(path.join("pages", "register", "index.html"), "r", encoding="utf-8") as f: output = f.read() - return HTMLResponse(content=output) \ No newline at end of file + return HTMLResponse(content=output) diff --git a/extensions/photos.py b/extensions/photos.py index 7061c96..8604af3 100644 --- a/extensions/photos.py +++ b/extensions/photos.py @@ -10,12 +10,27 @@ from datetime import datetime, timedelta, timezone from os import makedirs, path, remove, system from pydantic import ValidationError -from classes.exceptions import AccessTokenInvalidError, AlbumNameNotFoundError, PhotoNotFoundError, PhotoSearchQueryEmptyError, SearchPageInvalidError, SearchTokenInvalidError +from classes.exceptions import ( + AccessTokenInvalidError, + AlbumNameNotFoundError, + PhotoNotFoundError, + PhotoSearchQueryEmptyError, + SearchPageInvalidError, + SearchTokenInvalidError, +) from classes.models import Photo, PhotoPublic, SearchResultsPhoto from modules.exif_reader import extract_location from modules.hasher import get_phash, get_duplicates from modules.scheduler import scheduler -from modules.security import ALGORITHM, SECRET_KEY, TokenData, User, create_access_token, get_current_active_user, get_user +from modules.security import ( + ALGORITHM, + SECRET_KEY, + TokenData, + User, + create_access_token, + get_current_active_user, + get_user, +) from modules.app import app from modules.database import col_photos, col_albums, col_tokens from pymongo import DESCENDING @@ -27,12 +42,16 @@ from jose import JWTError, jwt from fastapi import UploadFile, Security from fastapi.responses import UJSONResponse, Response from fastapi.exceptions import HTTPException -from starlette.status import HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_409_CONFLICT +from starlette.status import ( + HTTP_204_NO_CONTENT, + HTTP_401_UNAUTHORIZED, + HTTP_409_CONFLICT, +) from modules.utils import configGet, logWrite -async def compress_image(image_path: str): +async def compress_image(image_path: str): image_type = Magic(mime=True).from_file(image_path) if image_type not in ["image/jpeg", "image/png"]: @@ -42,7 +61,10 @@ async def compress_image(image_path: str): size_before = path.getsize(image_path) / 1024 if image_type == "image/jpeg": - task = Thread(target=system, kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'}) + task = Thread( + target=system, + kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'}, + ) elif image_type == "image/png": task = Thread(target=system, kwargs={"command": f'optipng -o3 "{image_path}"'}) else: @@ -53,7 +75,10 @@ async def compress_image(image_path: str): task.join() size_after = path.getsize(image_path) / 1024 - logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb") + logWrite( + f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb" + ) + photo_post_responses = { 404: AlbumNameNotFoundError("name").openapi, @@ -63,34 +88,56 @@ photo_post_responses = { "application/json": { "example": { "detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.", - "duplicates": [ - "string" - ], - "access_token": "string" + "duplicates": ["string"], + "access_token": "string", } } - } - } + }, + }, } -@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_responses) -async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): - if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: + +@app.post( + "/albums/{album}/photos", + description="Upload a photo to album", + response_class=UJSONResponse, + response_model=Photo, + responses=photo_post_responses, +) +async def photo_upload( + file: UploadFile, + album: str, + ignore_duplicates: bool = False, + compress: bool = True, + caption: Union[str, None] = None, + current_user: User = Security(get_current_active_user, scopes=["photos.write"]), +): + if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) - makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True) + makedirs( + path.join("data", "users", current_user.user, "albums", album), exist_ok=True + ) filename = file.filename - if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)): + if path.exists( + path.join("data", "users", current_user.user, "albums", album, file.filename) + ): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] - filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension + filename = ( + ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension + ) - with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f: + with open( + path.join("data", "users", current_user.user, "albums", album, filename), "wb" + ) as f: f.write(await file.read()) - file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename)) + file_hash = await get_phash( + path.join("data", "users", current_user.user, "albums", album, filename) + ) duplicates = await get_duplicates(file_hash, album) if len(duplicates) > 0 and ignore_duplicates is False: @@ -98,73 +145,90 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F duplicates_ids = [] for entry in duplicates: duplicates_ids.append(entry["id"]) - access_token = create_access_token(data={"sub": current_user.user, "scopes": ["me", "photos.read"], "allowed": duplicates_ids}, expires_delta=timedelta(hours=configGet("media_token_valid_hours"))) + access_token = create_access_token( + data={ + "sub": current_user.user, + "scopes": ["me", "photos.read"], + "allowed": duplicates_ids, + }, + expires_delta=timedelta(hours=configGet("media_token_valid_hours")), + ) access_token_short = uuid4().hex[:12].lower() - col_tokens.insert_one({"short": access_token_short, "access_token": access_token, "photos": duplicates_ids}) + col_tokens.insert_one( + { + "short": access_token_short, + "access_token": access_token, + "photos": duplicates_ids, + } + ) else: access_token_short = None return UJSONResponse( { "detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.", "duplicates": duplicates, - "access_token": access_token_short + "access_token": access_token_short, }, - status_code=HTTP_409_CONFLICT + status_code=HTTP_409_CONFLICT, ) try: - coords = extract_location(path.join("data", "users", current_user.user, "albums", album, filename)) + coords = extract_location( + path.join("data", "users", current_user.user, "albums", album, filename) + ) except (UnpackError, ValueError): - coords = { - "lng": 0.0, - "lat": 0.0, - "alt": 0.0 - } + coords = {"lng": 0.0, "lat": 0.0, "alt": 0.0} uploaded = col_photos.insert_one( { "user": current_user.user, - "album": album, + "album": album, "hash": file_hash, "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), - "modified": datetime.now(tz=timezone.utc) + "modified": datetime.now(tz=timezone.utc), }, - "location": [ - coords["lng"], - coords["lat"], - coords["alt"] - ], - "caption": caption + "location": [coords["lng"], coords["lat"], coords["alt"]], + "caption": caption, } ) if compress is True: - scheduler.add_job(compress_image, trigger="date", run_date=datetime.now()+timedelta(seconds=1), args=[path.join("data", "users", current_user.user, "albums", album, filename)]) + scheduler.add_job( + compress_image, + trigger="date", + run_date=datetime.now() + timedelta(seconds=1), + args=[ + path.join("data", "users", current_user.user, "albums", album, filename) + ], + ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "album": album, "hash": file_hash, - "filename": filename + "filename": filename, } ) + # Access to photos y token generated for example by # upload method when duplicates are found. Is disabled # by default and should remain so if not really needed. if configGet("media_token_access") is True: - photo_get_token_responses = { 401: AccessTokenInvalidError().openapi, - 404: PhotoNotFoundError("id").openapi + 404: PhotoNotFoundError("id").openapi, } - @app.get("/token/photo/{token}", description="Get a photo by its duplicate token", responses=photo_get_token_responses) + @app.get( + "/token/photo/{token}", + description="Get a photo by its duplicate token", + responses=photo_get_token_responses, + ) async def photo_get_token(token: str, id: int): - db_entry = col_tokens.find_one({"short": token}) if db_entry is None: @@ -183,188 +247,335 @@ if configGet("media_token_access") is True: except (JWTError, ValidationError) as exp: print(exp, flush=True) raise AccessTokenInvalidError() - + user = get_user(user=token_data.user) if id not in payload.get("allowed", []): raise AccessTokenInvalidError() try: - image = col_photos.find_one( {"_id": ObjectId(id)} ) + image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) - image_path = path.join("data", "users", user.user, "albums", image["album"], image["filename"]) + image_path = path.join( + "data", "users", user.user, "albums", image["album"], image["filename"] + ) mime = Magic(mime=True).from_file(image_path) - with open(image_path, "rb") as f: image_file = f.read() + with open(image_path, "rb") as f: + image_file = f.read() return Response(image_file, media_type=mime) -photo_get_responses = { - 404: PhotoNotFoundError("id").openapi -} -@app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses) -async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"])): +photo_get_responses = {404: PhotoNotFoundError("id").openapi} + + +@app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses) +async def photo_get( + id: str, + current_user: User = Security(get_current_active_user, scopes=["photos.read"]), +): try: - image = col_photos.find_one( {"_id": ObjectId(id)} ) + image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) - image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]) + image_path = path.join( + "data", "users", current_user.user, "albums", image["album"], image["filename"] + ) mime = Magic(mime=True).from_file(image_path) - with open(image_path, "rb") as f: image_file = f.read() + with open(image_path, "rb") as f: + image_file = f.read() return Response(image_file, media_type=mime) -photo_move_responses = { - 404: PhotoNotFoundError("id").openapi -} -@app.put("/photos/{id}", description="Move a photo to another album", response_model=PhotoPublic, responses=photo_move_responses) -async def photo_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): +photo_move_responses = {404: PhotoNotFoundError("id").openapi} + + +@app.put( + "/photos/{id}", + description="Move a photo to another album", + response_model=PhotoPublic, + responses=photo_move_responses, +) +async def photo_move( + id: str, + album: str, + current_user: User = Security(get_current_active_user, scopes=["photos.write"]), +): try: - image = col_photos.find_one( {"_id": ObjectId(id)} ) + image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) - if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: + if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) - if path.exists(path.join("data", "users", current_user.user, "albums", album, image["filename"])): + if path.exists( + path.join( + "data", "users", current_user.user, "albums", album, image["filename"] + ) + ): base_name = image["filename"].split(".")[:-1] extension = image["filename"].split(".")[-1] - filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension + filename = ( + ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension + ) else: filename = image["filename"] - col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} ) + col_photos.find_one_and_update( + {"_id": ObjectId(id)}, + { + "$set": { + "album": album, + "filename": filename, + "dates.modified": datetime.now(tz=timezone.utc), + } + }, + ) move( - path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]), - path.join("data", "users", current_user.user, "albums", album, filename) + path.join( + "data", + "users", + current_user.user, + "albums", + image["album"], + image["filename"], + ), + path.join("data", "users", current_user.user, "albums", album, filename), ) return UJSONResponse( { "id": image["_id"].__str__(), "caption": image["caption"], - "filename": filename + "filename": filename, } ) -photo_patch_responses = { - 404: PhotoNotFoundError("id").openapi -} -@app.patch("/photos/{id}", description="Change properties of a photo", response_model=PhotoPublic, responses=photo_patch_responses) -async def photo_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): +photo_patch_responses = {404: PhotoNotFoundError("id").openapi} + + +@app.patch( + "/photos/{id}", + description="Change properties of a photo", + response_model=PhotoPublic, + responses=photo_patch_responses, +) +async def photo_patch( + id: str, + caption: str, + current_user: User = Security(get_current_active_user, scopes=["photos.write"]), +): try: - image = col_photos.find_one( {"_id": ObjectId(id)} ) + image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) - col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} ) + col_photos.find_one_and_update( + {"_id": ObjectId(id)}, + {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}}, + ) return UJSONResponse( { "id": image["_id"].__str__(), "caption": caption, - "filename": image["filename"] + "filename": image["filename"], } ) -photo_delete_responses = { - 404: PhotoNotFoundError("id").openapi -} -@app.delete("/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT, responses=photo_delete_responses) -async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): +photo_delete_responses = {404: PhotoNotFoundError("id").openapi} + + +@app.delete( + "/photos/{id}", + description="Delete a photo by id", + status_code=HTTP_204_NO_CONTENT, + responses=photo_delete_responses, +) +async def photo_delete( + id: str, + current_user: User = Security(get_current_active_user, scopes=["photos.write"]), +): try: - image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} ) + image = col_photos.find_one_and_delete({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) - album = col_albums.find_one( {"name": image["album"]} ) + album = col_albums.find_one({"name": image["album"]}) if album is not None and album["cover"] == image["_id"].__str__(): - col_albums.update_one( {"name": image["album"]}, {"$set": {"cover": None}} ) + col_albums.update_one({"name": image["album"]}, {"$set": {"cover": None}}) - remove(path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])) + remove( + path.join( + "data", + "users", + current_user.user, + "albums", + image["album"], + image["filename"], + ) + ) return Response(status_code=HTTP_204_NO_CONTENT) + photo_find_responses = { 400: SearchPageInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, - 422: PhotoSearchQueryEmptyError().openapi + 422: PhotoSearchQueryEmptyError().openapi, } -@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_responses) -async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])): - if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: + +@app.get( + "/albums/{album}/photos", + description="Find a photo by filename", + response_class=UJSONResponse, + response_model=SearchResultsPhoto, + responses=photo_find_responses, +) +async def photo_find( + album: str, + q: Union[str, None] = None, + caption: Union[str, None] = None, + page: int = 1, + page_size: int = 100, + lat: Union[float, None] = None, + lng: Union[float, None] = None, + radius: Union[int, None] = None, + current_user: User = Security(get_current_active_user, scopes=["photos.list"]), +): + if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} - skip = (page-1)*page_size + skip = (page - 1) * page_size radius = 5000 if radius is None else radius if (lat is not None) and (lng is not None): - db_query = {"user": current_user.user, "album": album, "location": { "$nearSphere": {"$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius} } } - db_query_count = {"user": current_user.user, "album": album, "location": { "$geoWithin": { "$centerSphere": [ [lng, lat], radius ] } } } + db_query = { + "user": current_user.user, + "album": album, + "location": { + "$nearSphere": { + "$geometry": {"type": "Point", "coordinates": [lng, lat]}, + "$maxDistance": radius, + } + }, + } + db_query_count = { + "user": current_user.user, + "album": album, + "location": {"$geoWithin": {"$centerSphere": [[lng, lat], radius]}}, + } elif q is None and caption is None: raise PhotoSearchQueryEmptyError() elif q is None and caption is not None: - db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)} - db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)} + db_query = { + "user": current_user.user, + "album": album, + "caption": re.compile(caption), + } + db_query_count = { + "user": current_user.user, + "album": album, + "caption": re.compile(caption), + } elif q is not None and caption is None: - db_query = {"user": current_user.user, "album": album, "filename": re.compile(q)} - db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q)} + db_query = { + "user": current_user.user, + "album": album, + "filename": re.compile(q), + } + db_query_count = { + "user": current_user.user, + "album": album, + "filename": re.compile(q), + } else: - db_query = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore - db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore + db_query = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore + db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore - images = list(col_photos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) + images = list( + col_photos.find(db_query, limit=page_size, skip=skip).sort( + "dates.uploaded", DESCENDING + ) + ) for image in images: - output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"], "caption": image["caption"]}) + output["results"].append( + { + "id": image["_id"].__str__(), + "filename": image["filename"], + "caption": image["caption"], + } + ) - if col_photos.count_documents( db_query_count ) > page*page_size: + if col_photos.count_documents(db_query_count) > page * page_size: token = str(token_urlsafe(32)) - col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} ) - output["next_page"] = f"/albums/{album}/photos/token?token={token}" # type: ignore + col_tokens.insert_one( + { + "token": token, + "query": q, + "album": album, + "page": page + 1, + "page_size": page_size, + "user": pickle.dumps(current_user), + } + ) + output["next_page"] = f"/albums/{album}/photos/token?token={token}" # type: ignore else: - output["next_page"] = None # type: ignore + output["next_page"] = None # type: ignore return UJSONResponse(output) -photo_find_token_responses = { - 401: SearchTokenInvalidError().openapi -} -@app.get("/albums/{album}/photos/token", description="Find a photo by token", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_token_responses) -async def photo_find_token(token: str): - found_record = col_tokens.find_one( {"token": token} ) +photo_find_token_responses = {401: SearchTokenInvalidError().openapi} + + +@app.get( + "/albums/{album}/photos/token", + description="Find a photo by token", + response_class=UJSONResponse, + response_model=SearchResultsPhoto, + responses=photo_find_token_responses, +) +async def photo_find_token(token: str): + found_record = col_tokens.find_one({"token": token}) if found_record is None: raise SearchTokenInvalidError() - return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"])) \ No newline at end of file + return await photo_find( + q=found_record["query"], + album=found_record["album"], + page=found_record["page"], + page_size=found_record["page_size"], + current_user=pickle.loads(found_record["user"]), + ) diff --git a/extensions/security.py b/extensions/security.py index 68f382f..8590fc6 100644 --- a/extensions/security.py +++ b/extensions/security.py @@ -11,12 +11,12 @@ from modules.security import ( ACCESS_TOKEN_EXPIRE_DAYS, Token, authenticate_user, - create_access_token + create_access_token, ) -token_post_responses = { - 401: UserCredentialsInvalid().openapi -} +token_post_responses = {401: UserCredentialsInvalid().openapi} + + @app.post("/token", response_model=Token, responses=token_post_responses) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): user = authenticate_user(form_data.username, form_data.password) @@ -27,4 +27,4 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends( data={"sub": user.user, "scopes": form_data.scopes}, expires_delta=access_token_expires, ) - return {"access_token": access_token, "token_type": "bearer"} \ No newline at end of file + return {"access_token": access_token, "token_type": "bearer"} diff --git a/extensions/users.py b/extensions/users.py index 0bdaf76..81820df 100644 --- a/extensions/users.py +++ b/extensions/users.py @@ -1,6 +1,17 @@ from datetime import datetime, timedelta -from classes.exceptions import UserAlreadyExists, UserCredentialsInvalid, UserEmailCodeInvalid -from modules.database import col_users, col_albums, col_photos, col_emails, col_videos, col_emails +from classes.exceptions import ( + UserAlreadyExists, + UserCredentialsInvalid, + UserEmailCodeInvalid, +) +from modules.database import ( + col_users, + col_albums, + col_photos, + col_emails, + col_videos, + col_emails, +) from modules.app import app from modules.utils import configGet, logWrite from modules.scheduler import scheduler @@ -16,7 +27,7 @@ from modules.security import ( get_current_active_user, get_password_hash, get_user, - verify_password + verify_password, ) @@ -26,9 +37,17 @@ async def send_confirmation(user: str, email: str): mail_sender.sendmail( from_addr=configGet("sender", "mailer", "smtp"), to_addrs=email, - msg=f'From: {configGet("sender", "mailer", "smtp")}\nSubject: Email confirmation\n\n'+configGet("message", "mailer", "messages", "registration_confirmation").format(configGet("external_address")+f"/users/{user}/confirm?code={confirmation_code}") + msg=f'From: {configGet("sender", "mailer", "smtp")}\nSubject: Email confirmation\n\n' + + configGet( + "message", "mailer", "messages", "registration_confirmation" + ).format( + configGet("external_address") + + f"/users/{user}/confirm?code={confirmation_code}" + ), + ) + col_emails.insert_one( + {"user": user, "email": email, "used": False, "code": confirmation_code} ) - col_emails.insert_one( {"user": user, "email": email, "used": False, "code": confirmation_code} ) logWrite(f"Sent confirmation email to '{email}' with code {confirmation_code}") except Exception as exp: logWrite(f"Could not send confirmation email to '{email}' due to: {exp}") @@ -44,51 +63,85 @@ user_confirm_responses = { "description": "Successful Response", "content": { "application/json": { - "example": { - "detail": configGet("email_confirmed", "messages") - } + "example": {"detail": configGet("email_confirmed", "messages")} } - } + }, }, - 400: UserEmailCodeInvalid().openapi + 400: UserEmailCodeInvalid().openapi, } if configGet("registration_requires_confirmation") is True: - @app.get("/users/{user}/confirm", response_class=UJSONResponse, responses=user_confirm_responses) - @app.patch("/users/{user}/confirm", response_class=UJSONResponse, responses=user_confirm_responses) + + @app.get( + "/users/{user}/confirm", + response_class=UJSONResponse, + responses=user_confirm_responses, + ) + @app.patch( + "/users/{user}/confirm", + response_class=UJSONResponse, + responses=user_confirm_responses, + ) async def user_confirm(user: str, code: str): - confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} ) + confirm_record = col_emails.find_one( + {"user": user, "code": code, "used": False} + ) if confirm_record is None: raise UserEmailCodeInvalid() - col_emails.find_one_and_update( {"_id": confirm_record["_id"]}, {"$set": {"used": True}} ) - col_users.find_one_and_update( {"user": confirm_record["user"]}, {"$set": {"disabled": False}} ) - return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} ) + col_emails.find_one_and_update( + {"_id": confirm_record["_id"]}, {"$set": {"used": True}} + ) + col_users.find_one_and_update( + {"user": confirm_record["user"]}, {"$set": {"disabled": False}} + ) + return UJSONResponse({"detail": configGet("email_confirmed", "messages")}) -user_create_responses = { - 409: UserAlreadyExists().openapi -} + +user_create_responses = {409: UserAlreadyExists().openapi} if configGet("registration_enabled") is True: - @app.post("/users", status_code=HTTP_204_NO_CONTENT, responses=user_create_responses) - async def user_create(user: str = Form(), email: str = Form(), password: str = Form()): - if col_users.find_one( {"user": user} ) is not None: + + @app.post( + "/users", status_code=HTTP_204_NO_CONTENT, responses=user_create_responses + ) + async def user_create( + user: str = Form(), email: str = Form(), password: str = Form() + ): + if col_users.find_one({"user": user}) is not None: raise UserAlreadyExists() - col_users.insert_one( {"user": user, "email": email, "hash": get_password_hash(password), "disabled": configGet("registration_requires_confirmation")} ) + col_users.insert_one( + { + "user": user, + "email": email, + "hash": get_password_hash(password), + "disabled": configGet("registration_requires_confirmation"), + } + ) if configGet("registration_requires_confirmation") is True: - scheduler.add_job( send_confirmation, trigger="date", run_date=datetime.now()+timedelta(seconds=1), kwargs={"user": user, "email": email} ) + scheduler.add_job( + send_confirmation, + trigger="date", + run_date=datetime.now() + timedelta(seconds=1), + kwargs={"user": user, "email": email}, + ) return Response(status_code=HTTP_204_NO_CONTENT) -user_delete_responses = { - 401: UserCredentialsInvalid().openapi -} -@app.delete("/users/me/", status_code=HTTP_204_NO_CONTENT, responses=user_delete_responses) -async def user_delete(password: str = Form(), current_user: User = Depends(get_current_active_user)): + +user_delete_responses = {401: UserCredentialsInvalid().openapi} + + +@app.delete( + "/users/me/", status_code=HTTP_204_NO_CONTENT, responses=user_delete_responses +) +async def user_delete( + password: str = Form(), current_user: User = Depends(get_current_active_user) +): user = get_user(current_user.user) if not user: return False if not verify_password(password, user.hash): raise UserCredentialsInvalid() - col_users.delete_many( {"user": current_user.user} ) - col_emails.delete_many( {"user": current_user.user} ) - col_photos.delete_many( {"user": current_user.user} ) - col_videos.delete_many( {"user": current_user.user} ) - col_albums.delete_many( {"user": current_user.user} ) - return Response(status_code=HTTP_204_NO_CONTENT) \ No newline at end of file + col_users.delete_many({"user": current_user.user}) + col_emails.delete_many({"user": current_user.user}) + col_photos.delete_many({"user": current_user.user}) + col_videos.delete_many({"user": current_user.user}) + col_albums.delete_many({"user": current_user.user}) + return Response(status_code=HTTP_204_NO_CONTENT) diff --git a/extensions/videos.py b/extensions/videos.py index 7a00abe..689b791 100644 --- a/extensions/videos.py +++ b/extensions/videos.py @@ -6,7 +6,13 @@ from typing import Union from magic import Magic from datetime import datetime, timezone from os import makedirs, path, remove -from classes.exceptions import AlbumNameNotFoundError, SearchPageInvalidError, SearchTokenInvalidError, VideoNotFoundError, VideoSearchQueryEmptyError +from classes.exceptions import ( + AlbumNameNotFoundError, + SearchPageInvalidError, + SearchTokenInvalidError, + VideoNotFoundError, + VideoSearchQueryEmptyError, +) from classes.models import Video, SearchResultsVideo, VideoPublic from modules.security import User, get_current_active_user from modules.app import app @@ -19,29 +25,47 @@ from fastapi import UploadFile, Security from fastapi.responses import UJSONResponse, Response from starlette.status import HTTP_204_NO_CONTENT -video_post_responses = { - 404: AlbumNameNotFoundError("name").openapi -} -@app.post("/albums/{album}/videos", description="Upload a video to album", response_class=UJSONResponse, response_model=Video, responses=video_post_responses) -async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): +video_post_responses = {404: AlbumNameNotFoundError("name").openapi} - if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: + +@app.post( + "/albums/{album}/videos", + description="Upload a video to album", + response_class=UJSONResponse, + response_model=Video, + responses=video_post_responses, +) +async def video_upload( + file: UploadFile, + album: str, + caption: Union[str, None] = None, + current_user: User = Security(get_current_active_user, scopes=["videos.write"]), +): + if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) - makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True) + makedirs( + path.join("data", "users", current_user.user, "albums", album), exist_ok=True + ) filename = file.filename - if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)): + if path.exists( + path.join("data", "users", current_user.user, "albums", album, file.filename) + ): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] - filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension + filename = ( + ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension + ) - with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f: + with open( + path.join("data", "users", current_user.user, "albums", album, filename), "wb" + ) as f: f.write(await file.read()) # Hashing and duplicates check should be here - + # Coords extraction should be here uploaded = col_videos.insert_one( @@ -51,9 +75,9 @@ async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), - "modified": datetime.now(tz=timezone.utc) + "modified": datetime.now(tz=timezone.utc), }, - "caption": caption + "caption": caption, } ) @@ -61,165 +85,295 @@ async def video_upload(file: UploadFile, album: str, caption: Union[str, None] = { "id": uploaded.inserted_id.__str__(), "album": album, - "hash": "", # SHOULD BE DONE - "filename": filename + "hash": "", # SHOULD BE DONE + "filename": filename, } ) -video_get_responses = { - 404: VideoNotFoundError("id").openapi -} -@app.get("/videos/{id}", description="Get a video by id", responses=video_get_responses) -async def video_get(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.read"])): +video_get_responses = {404: VideoNotFoundError("id").openapi} + + +@app.get("/videos/{id}", description="Get a video by id", responses=video_get_responses) +async def video_get( + id: str, + current_user: User = Security(get_current_active_user, scopes=["videos.read"]), +): try: - video = col_videos.find_one( {"_id": ObjectId(id)} ) + video = col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) - video_path = path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]) + video_path = path.join( + "data", "users", current_user.user, "albums", video["album"], video["filename"] + ) mime = Magic(mime=True).from_file(video_path) - with open(video_path, "rb") as f: video_file = f.read() + with open(video_path, "rb") as f: + video_file = f.read() return Response(video_file, media_type=mime) -video_move_responses = { - 404: VideoNotFoundError("id").openapi -} -@app.put("/videos/{id}", description="Move a video into another album", response_model=VideoPublic, responses=video_move_responses) -async def video_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): +video_move_responses = {404: VideoNotFoundError("id").openapi} + + +@app.put( + "/videos/{id}", + description="Move a video into another album", + response_model=VideoPublic, + responses=video_move_responses, +) +async def video_move( + id: str, + album: str, + current_user: User = Security(get_current_active_user, scopes=["videos.write"]), +): try: - video = col_videos.find_one( {"_id": ObjectId(id)} ) + video = col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) - if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: + if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) - if path.exists(path.join("data", "users", current_user.user, "albums", album, video["filename"])): + if path.exists( + path.join( + "data", "users", current_user.user, "albums", album, video["filename"] + ) + ): base_name = video["filename"].split(".")[:-1] extension = video["filename"].split(".")[-1] - filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension + filename = ( + ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension + ) else: filename = video["filename"] - col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} ) + col_videos.find_one_and_update( + {"_id": ObjectId(id)}, + { + "$set": { + "album": album, + "filename": filename, + "dates.modified": datetime.now(tz=timezone.utc), + } + }, + ) move( - path.join("data", "users", current_user.user, "albums", video["album"], video["filename"]), - path.join("data", "users", current_user.user, "albums", album, filename) + path.join( + "data", + "users", + current_user.user, + "albums", + video["album"], + video["filename"], + ), + path.join("data", "users", current_user.user, "albums", album, filename), ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], - "filename": filename + "filename": filename, } ) -video_patch_responses = { - 404: VideoNotFoundError("id").openapi -} -@app.patch("/videos/{id}", description="Change properties of a video", response_model=VideoPublic, responses=video_patch_responses) -async def video_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): +video_patch_responses = {404: VideoNotFoundError("id").openapi} + + +@app.patch( + "/videos/{id}", + description="Change properties of a video", + response_model=VideoPublic, + responses=video_patch_responses, +) +async def video_patch( + id: str, + caption: str, + current_user: User = Security(get_current_active_user, scopes=["videos.write"]), +): try: - video = col_videos.find_one( {"_id": ObjectId(id)} ) + video = col_videos.find_one({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) - col_videos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} ) + col_videos.find_one_and_update( + {"_id": ObjectId(id)}, + {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}}, + ) return UJSONResponse( { "id": video["_id"].__str__(), "caption": video["caption"], - "filename": video["filename"] + "filename": video["filename"], } ) -video_delete_responses = { - 404: VideoNotFoundError("id").openapi -} -@app.delete("/videos/{id}", description="Delete a video by id", status_code=HTTP_204_NO_CONTENT, responses=video_delete_responses) -async def video_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["videos.write"])): +video_delete_responses = {404: VideoNotFoundError("id").openapi} + + +@app.delete( + "/videos/{id}", + description="Delete a video by id", + status_code=HTTP_204_NO_CONTENT, + responses=video_delete_responses, +) +async def video_delete( + id: str, + current_user: User = Security(get_current_active_user, scopes=["videos.write"]), +): try: - video = col_videos.find_one_and_delete( {"_id": ObjectId(id)} ) + video = col_videos.find_one_and_delete({"_id": ObjectId(id)}) if video is None: raise InvalidId(id) except InvalidId: raise VideoNotFoundError(id) - album = col_albums.find_one( {"name": video["album"]} ) + album = col_albums.find_one({"name": video["album"]}) - remove(path.join("data", "users", current_user.user, "albums", video["album"], video["filename"])) + remove( + path.join( + "data", + "users", + current_user.user, + "albums", + video["album"], + video["filename"], + ) + ) return Response(status_code=HTTP_204_NO_CONTENT) + video_find_responses = { 400: SearchPageInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, - 422: VideoSearchQueryEmptyError().openapi + 422: VideoSearchQueryEmptyError().openapi, } -@app.get("/albums/{album}/videos", description="Find a video by filename", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_responses) -async def video_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, current_user: User = Security(get_current_active_user, scopes=["videos.list"])): - if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: + +@app.get( + "/albums/{album}/videos", + description="Find a video by filename", + response_class=UJSONResponse, + response_model=SearchResultsVideo, + responses=video_find_responses, +) +async def video_find( + album: str, + q: Union[str, None] = None, + caption: Union[str, None] = None, + page: int = 1, + page_size: int = 100, + current_user: User = Security(get_current_active_user, scopes=["videos.list"]), +): + if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} - skip = (page-1)*page_size + skip = (page - 1) * page_size if q is None and caption is None: raise VideoSearchQueryEmptyError() - - if q is None and caption is not None: - db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)} - db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)} - elif q is not None and caption is None: - db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q)}, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) - db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(q)} - else: - db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)}, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) # type: ignore - db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore - videos = list(col_videos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) + if q is None and caption is not None: + db_query = { + "user": current_user.user, + "album": album, + "caption": re.compile(caption), + } + db_query_count = { + "user": current_user.user, + "album": album, + "caption": re.compile(caption), + } + elif q is not None and caption is None: + db_query = list( + col_videos.find( + {"user": current_user.user, "album": album, "filename": re.compile(q)}, + limit=page_size, + skip=skip, + ).sort("dates.uploaded", DESCENDING) + ) + db_query_count = { + "user": current_user.user, + "album": album, + "caption": re.compile(q), + } + else: + db_query = list(col_videos.find({"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)}, limit=page_size, skip=skip).sort("dates.uploaded", DESCENDING)) # type: ignore + db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore + + videos = list( + col_videos.find(db_query, limit=page_size, skip=skip).sort( + "dates.uploaded", DESCENDING + ) + ) for video in videos: - output["results"].append({"id": video["_id"].__str__(), "filename": video["filename"], "caption": video["caption"]}) + output["results"].append( + { + "id": video["_id"].__str__(), + "filename": video["filename"], + "caption": video["caption"], + } + ) - if col_videos.count_documents( db_query_count ) > page*page_size: + if col_videos.count_documents(db_query_count) > page * page_size: token = str(token_urlsafe(32)) - col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} ) - output["next_page"] = f"/albums/{album}/videos/token?token={token}" # type: ignore + col_tokens.insert_one( + { + "token": token, + "query": q, + "album": album, + "page": page + 1, + "page_size": page_size, + "user": pickle.dumps(current_user), + } + ) + output["next_page"] = f"/albums/{album}/videos/token?token={token}" # type: ignore else: - output["next_page"] = None # type: ignore + output["next_page"] = None # type: ignore return UJSONResponse(output) -video_find_token_responses = { - 401: SearchTokenInvalidError().openapi -} -@app.get("/albums/{album}/videos/token", description="Find a video by token", response_class=UJSONResponse, response_model=SearchResultsVideo, responses=video_find_token_responses) -async def video_find_token(token: str): - found_record = col_tokens.find_one( {"token": token} ) +video_find_token_responses = {401: SearchTokenInvalidError().openapi} + + +@app.get( + "/albums/{album}/videos/token", + description="Find a video by token", + response_class=UJSONResponse, + response_model=SearchResultsVideo, + responses=video_find_token_responses, +) +async def video_find_token(token: str): + found_record = col_tokens.find_one({"token": token}) if found_record is None: raise SearchTokenInvalidError() - return await video_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"])) \ No newline at end of file + return await video_find( + q=found_record["query"], + album=found_record["album"], + page=found_record["page"], + page_size=found_record["page_size"], + current_user=pickle.loads(found_record["user"]), + ) diff --git a/modules/app.py b/modules/app.py index b02d478..0bd3582 100644 --- a/modules/app.py +++ b/modules/app.py @@ -4,18 +4,20 @@ from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html app = FastAPI(title="END PLAY Photos", docs_url=None, redoc_url=None, version="0.1") + @app.get("/docs", include_in_schema=False) async def custom_swagger_ui_html(): return get_swagger_ui_html( - openapi_url=app.openapi_url, # type: ignore + openapi_url=app.openapi_url, # type: ignore title=app.title + " - Documentation", - swagger_favicon_url="/favicon.ico" + swagger_favicon_url="/favicon.ico", ) + @app.get("/redoc", include_in_schema=False) async def custom_redoc_html(): return get_redoc_html( - openapi_url=app.openapi_url, # type: ignore + openapi_url=app.openapi_url, # type: ignore title=app.title + " - Documentation", - redoc_favicon_url="/favicon.ico" - ) \ No newline at end of file + redoc_favicon_url="/favicon.ico", + ) diff --git a/modules/database.py b/modules/database.py index aefc193..46a18e4 100644 --- a/modules/database.py +++ b/modules/database.py @@ -4,18 +4,16 @@ from pymongo import MongoClient, GEOSPHERE db_config = configGet("database") if db_config["user"] is not None and db_config["password"] is not None: - con_string = 'mongodb://{0}:{1}@{2}:{3}/{4}'.format( + con_string = "mongodb://{0}:{1}@{2}:{3}/{4}".format( db_config["user"], db_config["password"], db_config["host"], db_config["port"], - db_config["name"] + db_config["name"], ) else: - con_string = 'mongodb://{0}:{1}/{2}'.format( - db_config["host"], - db_config["port"], - db_config["name"] + con_string = "mongodb://{0}:{1}/{2}".format( + db_config["host"], db_config["port"], db_config["name"] ) db_client = MongoClient(con_string) @@ -35,4 +33,4 @@ col_videos = db.get_collection("videos") col_tokens = db.get_collection("tokens") col_emails = db.get_collection("emails") -col_photos.create_index([("location", GEOSPHERE)]) \ No newline at end of file +col_photos.create_index([("location", GEOSPHERE)]) diff --git a/modules/exif_reader.py b/modules/exif_reader.py index 8f61c43..7c3629c 100644 --- a/modules/exif_reader.py +++ b/modules/exif_reader.py @@ -1,5 +1,6 @@ from exif import Image + def decimal_coords(coords: float, ref: str) -> float: """Get latitude/longitude from coord and direction reference @@ -9,12 +10,13 @@ def decimal_coords(coords: float, ref: str) -> float: ### Returns: * float: Decimal degrees - """ + """ decimal_degrees = coords[0] + coords[1] / 60 + coords[2] / 3600 if ref == "S" or ref == "W": decimal_degrees = -decimal_degrees return round(decimal_degrees, 5) + def extract_location(filepath: str) -> dict: """Get location data from image @@ -23,15 +25,11 @@ def extract_location(filepath: str) -> dict: ### Returns: * dict: `{ "lng": float, "lat": float, "alt": float }` - """ + """ - output = { - "lng": 0.0, - "lat": 0.0, - "alt": 0.0 - } + output = {"lng": 0.0, "lat": 0.0, "alt": 0.0} - with open(filepath, 'rb') as src: + with open(filepath, "rb") as src: img = Image(src) if img.has_exif is False: @@ -44,4 +42,4 @@ def extract_location(filepath: str) -> dict: except AttributeError: pass - return output \ No newline at end of file + return output diff --git a/modules/extensions_loader.py b/modules/extensions_loader.py index 2fcd295..e978fe2 100644 --- a/modules/extensions_loader.py +++ b/modules/extensions_loader.py @@ -1,13 +1,14 @@ from importlib.util import module_from_spec, spec_from_file_location from os import getcwd, path, walk -#================================================================================= +# ================================================================================= + # Import functions # Took from https://stackoverflow.com/a/57892961 def get_py_files(src): - cwd = getcwd() # Current Working directory - py_files = [] + cwd = getcwd() # Current Working directory + py_files = [] for root, dirs, files in walk(src): for file in files: if file.endswith(".py"): @@ -18,18 +19,21 @@ def get_py_files(src): def dynamic_import(module_name, py_path): try: module_spec = spec_from_file_location(module_name, py_path) - module = module_from_spec(module_spec) # type: ignore - module_spec.loader.exec_module(module) # type: ignore + module = module_from_spec(module_spec) # type: ignore + module_spec.loader.exec_module(module) # type: ignore return module except SyntaxError: - print(f"Could not load extension {module_name} due to invalid syntax. Check logs/errors.log for details.", flush=True) + print( + f"Could not load extension {module_name} due to invalid syntax. Check logs/errors.log for details.", + flush=True, + ) return except Exception as exp: print(f"Could not load extension {module_name} due to {exp}", flush=True) return -def dynamic_import_from_src(src, star_import = False): +def dynamic_import_from_src(src, star_import=False): my_py_files = get_py_files(src) for py_file in my_py_files: module_name = path.split(py_file)[-1][:-3] @@ -44,4 +48,5 @@ def dynamic_import_from_src(src, star_import = False): print(f"Successfully loaded {module_name} extension", flush=True) return -#================================================================================= \ No newline at end of file + +# ================================================================================= diff --git a/modules/hasher.py b/modules/hasher.py index 0f7e1ad..3ad56e9 100644 --- a/modules/hasher.py +++ b/modules/hasher.py @@ -4,55 +4,68 @@ from numpy.typing import NDArray from scipy import spatial import cv2 + def hash_array_to_hash_hex(hash_array): - # convert hash array of 0 or 1 to hash string in hex - hash_array = np.array(hash_array, dtype = np.uint8) - hash_str = ''.join(str(i) for i in 1 * hash_array.flatten()) - return (hex(int(hash_str, 2))) + # convert hash array of 0 or 1 to hash string in hex + hash_array = np.array(hash_array, dtype=np.uint8) + hash_str = "".join(str(i) for i in 1 * hash_array.flatten()) + return hex(int(hash_str, 2)) + def hash_hex_to_hash_array(hash_hex) -> NDArray: - # convert hash string in hex to hash values of 0 or 1 - hash_str = int(hash_hex, 16) - array_str = bin(hash_str)[2:] - return np.array([i for i in array_str], dtype = np.float32) + # convert hash string in hex to hash values of 0 or 1 + hash_str = int(hash_hex, 16) + array_str = bin(hash_str)[2:] + return np.array([i for i in array_str], dtype=np.float32) + def get_duplicates_cache(album: str) -> dict: output = {} - for photo in col_photos.find( {"album": album} ): + for photo in col_photos.find({"album": album}): output[photo["filename"]] = [photo["_id"].__str__(), photo["hash"]] return output + async def get_phash(filepath: str) -> str: img = cv2.imread(filepath) # resize image and convert to gray scale img = cv2.resize(img, (64, 64)) img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) - img = np.array(img, dtype = np.float32) - # calculate dct of image + img = np.array(img, dtype=np.float32) + # calculate dct of image dct = cv2.dct(img) - # to reduce hash length take only 8*8 top-left block + # to reduce hash length take only 8*8 top-left block # as this block has more information than the rest - dct_block = dct[: 8, : 8] + dct_block = dct[:8, :8] # caclulate mean of dct block excluding first term i.e, dct(0, 0) - dct_average = (dct_block.mean() * dct_block.size - dct_block[0, 0]) / (dct_block.size - 1) + dct_average = (dct_block.mean() * dct_block.size - dct_block[0, 0]) / ( + dct_block.size - 1 + ) # convert dct block to binary values based on dct_average dct_block[dct_block < dct_average] = 0.0 dct_block[dct_block != 0] = 1.0 # store hash value return hash_array_to_hash_hex(dct_block.flatten()) + async def get_duplicates(hash: str, album: str) -> list: duplicates = [] cache = get_duplicates_cache(album) for image_name in cache.keys(): try: distance = spatial.distance.hamming( - hash_hex_to_hash_array(cache[image_name][1]), - hash_hex_to_hash_array(hash) + hash_hex_to_hash_array(cache[image_name][1]), + hash_hex_to_hash_array(hash), ) except ValueError: continue # print("{0:<30} {1}".format(image_name, distance), flush=True) if distance <= 0.1: - duplicates.append({"id": cache[image_name][0], "filename": image_name, "difference": distance}) - return duplicates \ No newline at end of file + duplicates.append( + { + "id": cache[image_name][0], + "filename": image_name, + "difference": distance, + } + ) + return duplicates diff --git a/modules/mailer.py b/modules/mailer.py index a0d9428..d637fb6 100644 --- a/modules/mailer.py +++ b/modules/mailer.py @@ -20,8 +20,7 @@ try: logWrite(f"Initialized SMTP TLS connection") else: mail_sender = SMTP( - configGet("host", "mailer", "smtp"), - configGet("port", "mailer", "smtp") + configGet("host", "mailer", "smtp"), configGet("port", "mailer", "smtp") ) mail_sender.ehlo() logWrite(f"Initialized SMTP connection") @@ -31,9 +30,8 @@ except Exception as exp: try: mail_sender.login( - configGet("login", "mailer", "smtp"), - configGet("password", "mailer", "smtp") + configGet("login", "mailer", "smtp"), configGet("password", "mailer", "smtp") ) logWrite(f"Successfully initialized mailer") except Exception as exp: - logWrite(f"Could not login into provided SMTP account due to: {exp}") \ No newline at end of file + logWrite(f"Could not login into provided SMTP account due to: {exp}") diff --git a/modules/scheduler.py b/modules/scheduler.py index 0ec4214..a5eb79d 100644 --- a/modules/scheduler.py +++ b/modules/scheduler.py @@ -1,3 +1,3 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler -scheduler = AsyncIOScheduler() \ No newline at end of file +scheduler = AsyncIOScheduler() diff --git a/modules/security.py b/modules/security.py index 4ccd324..8703ce6 100644 --- a/modules/security.py +++ b/modules/security.py @@ -52,7 +52,7 @@ oauth2_scheme = OAuth2PasswordBearer( "photos.write": "Modify photos.", "videos.list": "List videos.", "videos.read": "View videos.", - "videos.write": "Modify videos." + "videos.write": "Modify videos.", }, ) @@ -66,8 +66,13 @@ def get_password_hash(password): def get_user(user: str): - found_user = col_users.find_one( {"user": user} ) - return UserInDB(user=found_user["user"], email=found_user["email"], disabled=found_user["disabled"], hash=found_user["hash"]) + found_user = col_users.find_one({"user": user}) + return UserInDB( + user=found_user["user"], + email=found_user["email"], + disabled=found_user["disabled"], + hash=found_user["hash"], + ) def authenticate_user(user_name: str, password: str): @@ -79,19 +84,22 @@ def authenticate_user(user_name: str, password: str): return user -def create_access_token( data: dict, expires_delta: Union[timedelta, None] = None ): +def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.now(tz=timezone.utc) + expires_delta else: - expire = datetime.now(tz=timezone.utc) + timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) + expire = datetime.now(tz=timezone.utc) + timedelta( + days=ACCESS_TOKEN_EXPIRE_DAYS + ) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt -async def get_current_user( security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) ): - +async def get_current_user( + security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme) +): if security_scopes.scopes: authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' else: @@ -112,7 +120,7 @@ async def get_current_user( security_scopes: SecurityScopes, token: str = Depend token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError): raise credentials_exception - + user = get_user(user=token_data.user) if user is None: @@ -128,7 +136,9 @@ async def get_current_user( security_scopes: SecurityScopes, token: str = Depend return user -async def get_current_active_user( current_user: User = Security(get_current_user, scopes=["me"]) ): +async def get_current_active_user( + current_user: User = Security(get_current_user, scopes=["me"]) +): if current_user.disabled: raise HTTPException(status_code=400, detail="Inactive user") - return current_user \ No newline at end of file + return current_user diff --git a/modules/utils.py b/modules/utils.py index 4119c07..7163ffa 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -2,12 +2,14 @@ from typing import Any, Union from ujson import loads, dumps, JSONDecodeError from traceback import print_exc + # Print to stdout and then to log def logWrite(message: str, debug: bool = False) -> None: # save to log file and rotation is to be done # logAppend(f'{message}', debug=debug) print(f"{message}", flush=True) + def jsonLoad(filepath: str) -> Any: """Load json file @@ -16,34 +18,40 @@ def jsonLoad(filepath: str) -> Any: ### Returns: * `Any`: Some json deserializable - """ - with open(filepath, "r", encoding='utf8') as file: + """ + with open(filepath, "r", encoding="utf8") as file: try: output = loads(file.read()) except JSONDecodeError: - logWrite(f"Could not load json file {filepath}: file seems to be incorrect!\n{print_exc()}") + logWrite( + f"Could not load json file {filepath}: file seems to be incorrect!\n{print_exc()}" + ) raise except FileNotFoundError: - logWrite(f"Could not load json file {filepath}: file does not seem to exist!\n{print_exc()}") + logWrite( + f"Could not load json file {filepath}: file does not seem to exist!\n{print_exc()}" + ) raise file.close() return output + def jsonSave(contents: Union[list, dict], filepath: str) -> None: """Save contents into json file ### Args: * contents (`Union[list, dict]`): Some json serializable * filepath (`str`): Path to output file - """ + """ try: - with open(filepath, "w", encoding='utf8') as file: + with open(filepath, "w", encoding="utf8") as file: file.write(dumps(contents, ensure_ascii=False, indent=4)) file.close() except Exception as exp: logWrite(f"Could not save json file {filepath}: {exp}\n{print_exc()}") return + def configGet(key: str, *args: str) -> Any: """Get value of the config key @@ -53,23 +61,25 @@ def configGet(key: str, *args: str) -> Any: ### Returns: * `Any`: Value of provided key - """ + """ this_dict = jsonLoad("config.json") this_key = this_dict for dict_key in args: this_key = this_key[dict_key] return this_key[key] + def apiKeyInvalid(obj): obj.send_response(401) - obj.send_header('Content-type', 'application/json; charset=utf-8') + obj.send_header("Content-type", "application/json; charset=utf-8") obj.end_headers() obj.wfile.write(b'{"code":401, "message": "Invalid API key"}') return + def apiKeyExpired(obj): obj.send_response(403) - obj.send_header('Content-type', 'application/json; charset=utf-8') + obj.send_header("Content-type", "application/json; charset=utf-8") obj.end_headers() obj.wfile.write(b'{"code":403, "message": "API key expired"}') - return \ No newline at end of file + return diff --git a/photos_api.py b/photos_api.py index dc84e4a..e359c90 100644 --- a/photos_api.py +++ b/photos_api.py @@ -13,8 +13,8 @@ async def favicon(): return FileResponse("favicon.ico") -#================================================================================= -dynamic_import_from_src("extensions", star_import = True) -#================================================================================= +# ================================================================================= +dynamic_import_from_src("extensions", star_import=True) +# ================================================================================= -scheduler.start() \ No newline at end of file +scheduler.start()