import re import pickle from secrets import token_urlsafe from shutil import move from threading import Thread from typing import Union from uuid import uuid4 from magic import Magic from datetime import datetime, timedelta, timezone from os import makedirs, path, remove, system from pydantic import ValidationError from classes.exceptions import ( AccessTokenInvalidError, AlbumNameNotFoundError, PhotoNotFoundError, PhotoSearchQueryEmptyError, SearchPageInvalidError, SearchTokenInvalidError, ) from classes.models import Photo, PhotoPublic, SearchResultsPhoto from modules.exif_reader import extract_location from modules.hasher import get_phash, get_duplicates from modules.scheduler import scheduler from modules.security import ( ALGORITHM, SECRET_KEY, TokenData, User, create_access_token, get_current_active_user, get_user, ) from modules.app import app from modules.database import col_photos, col_albums, col_tokens from pymongo import DESCENDING from bson.objectid import ObjectId from bson.errors import InvalidId from plum.exceptions import UnpackError from jose import JWTError, jwt from fastapi import UploadFile, Security from fastapi.responses import UJSONResponse, Response from fastapi.exceptions import HTTPException from starlette.status import ( HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_409_CONFLICT, ) from modules.utils import configGet, logWrite async def compress_image(image_path: str): image_type = Magic(mime=True).from_file(image_path) if image_type not in ["image/jpeg", "image/png"]: logWrite(f"Not compressing {image_path} because its mime is '{image_type}'") return size_before = path.getsize(image_path) / 1024 if image_type == "image/jpeg": task = Thread( target=system, kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'}, ) elif image_type == "image/png": task = Thread(target=system, kwargs={"command": f'optipng -o3 "{image_path}"'}) else: return task.start() logWrite(f"Compressing '{path.split(image_path)[-1]}'...") task.join() size_after = path.getsize(image_path) / 1024 logWrite( f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb" ) photo_post_responses = { 404: AlbumNameNotFoundError("name").openapi, 409: { "description": "Image Duplicates Found", "content": { "application/json": { "example": { "detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.", "duplicates": ["string"], "access_token": "string", } } }, }, } @app.post( "/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_responses, ) async def photo_upload( file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"]), ): if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) makedirs( path.join("data", "users", current_user.user, "albums", album), exist_ok=True ) filename = file.filename if path.exists( path.join("data", "users", current_user.user, "albums", album, file.filename) ): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] filename = ( ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension ) with open( path.join("data", "users", current_user.user, "albums", album, filename), "wb" ) as f: f.write(await file.read()) file_hash = await get_phash( path.join("data", "users", current_user.user, "albums", album, filename) ) duplicates = await get_duplicates(file_hash, album) if len(duplicates) > 0 and ignore_duplicates is False: if configGet("media_token_access") is True: duplicates_ids = [] for entry in duplicates: duplicates_ids.append(entry["id"]) access_token = create_access_token( data={ "sub": current_user.user, "scopes": ["me", "photos.read"], "allowed": duplicates_ids, }, expires_delta=timedelta(hours=configGet("media_token_valid_hours")), ) access_token_short = uuid4().hex[:12].lower() col_tokens.insert_one( { "short": access_token_short, "access_token": access_token, "photos": duplicates_ids, } ) else: access_token_short = None return UJSONResponse( { "detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.", "duplicates": duplicates, "access_token": access_token_short, }, status_code=HTTP_409_CONFLICT, ) try: coords = extract_location( path.join("data", "users", current_user.user, "albums", album, filename) ) except (UnpackError, ValueError): coords = {"lng": 0.0, "lat": 0.0, "alt": 0.0} uploaded = col_photos.insert_one( { "user": current_user.user, "album": album, "hash": file_hash, "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), "modified": datetime.now(tz=timezone.utc), }, "location": [coords["lng"], coords["lat"], coords["alt"]], "caption": caption, } ) if compress is True: scheduler.add_job( compress_image, trigger="date", run_date=datetime.now() + timedelta(seconds=1), args=[ path.join("data", "users", current_user.user, "albums", album, filename) ], ) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "album": album, "hash": file_hash, "filename": filename, } ) # Access to photos y token generated for example by # upload method when duplicates are found. Is disabled # by default and should remain so if not really needed. if configGet("media_token_access") is True: photo_get_token_responses = { 401: AccessTokenInvalidError().openapi, 404: PhotoNotFoundError("id").openapi, } @app.get( "/token/photo/{token}", description="Get a photo by its duplicate token", responses=photo_get_token_responses, ) async def photo_get_token(token: str, id: int): db_entry = col_tokens.find_one({"short": token}) if db_entry is None: raise AccessTokenInvalidError() token = db_entry["access_token"] id = db_entry["photos"][id] try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user: str = payload.get("sub") if user is None: raise AccessTokenInvalidError() token_scopes = payload.get("scopes", []) token_data = TokenData(scopes=token_scopes, user=user) except (JWTError, ValidationError) as exp: print(exp, flush=True) raise AccessTokenInvalidError() user = get_user(user=token_data.user) if id not in payload.get("allowed", []): raise AccessTokenInvalidError() try: image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) image_path = path.join( "data", "users", user.user, "albums", image["album"], image["filename"] ) mime = Magic(mime=True).from_file(image_path) with open(image_path, "rb") as f: image_file = f.read() return Response(image_file, media_type=mime) photo_get_responses = {404: PhotoNotFoundError("id").openapi} @app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses) async def photo_get( id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"]), ): try: image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) image_path = path.join( "data", "users", current_user.user, "albums", image["album"], image["filename"] ) mime = Magic(mime=True).from_file(image_path) with open(image_path, "rb") as f: image_file = f.read() return Response(image_file, media_type=mime) photo_move_responses = {404: PhotoNotFoundError("id").openapi} @app.put( "/photos/{id}", description="Move a photo to another album", response_model=PhotoPublic, responses=photo_move_responses, ) async def photo_move( id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"]), ): try: image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) if path.exists( path.join( "data", "users", current_user.user, "albums", album, image["filename"] ) ): base_name = image["filename"].split(".")[:-1] extension = image["filename"].split(".")[-1] filename = ( ".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension ) else: filename = image["filename"] col_photos.find_one_and_update( {"_id": ObjectId(id)}, { "$set": { "album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc), } }, ) move( path.join( "data", "users", current_user.user, "albums", image["album"], image["filename"], ), path.join("data", "users", current_user.user, "albums", album, filename), ) return UJSONResponse( { "id": image["_id"].__str__(), "caption": image["caption"], "filename": filename, } ) photo_patch_responses = {404: PhotoNotFoundError("id").openapi} @app.patch( "/photos/{id}", description="Change properties of a photo", response_model=PhotoPublic, responses=photo_patch_responses, ) async def photo_patch( id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"]), ): try: image = col_photos.find_one({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}}, ) return UJSONResponse( { "id": image["_id"].__str__(), "caption": caption, "filename": image["filename"], } ) photo_delete_responses = {404: PhotoNotFoundError("id").openapi} @app.delete( "/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT, responses=photo_delete_responses, ) async def photo_delete( id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"]), ): try: image = col_photos.find_one_and_delete({"_id": ObjectId(id)}) if image is None: raise InvalidId(id) except InvalidId: raise PhotoNotFoundError(id) album = col_albums.find_one({"name": image["album"]}) if album is not None and album["cover"] == image["_id"].__str__(): col_albums.update_one({"name": image["album"]}, {"$set": {"cover": None}}) remove( path.join( "data", "users", current_user.user, "albums", image["album"], image["filename"], ) ) return Response(status_code=HTTP_204_NO_CONTENT) photo_find_responses = { 400: SearchPageInvalidError().openapi, 401: SearchTokenInvalidError().openapi, 404: AlbumNameNotFoundError("name").openapi, 422: PhotoSearchQueryEmptyError().openapi, } @app.get( "/albums/{album}/photos", description="Find a photo by filename, caption, location or token", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_responses, ) async def photo_find( album: str, q: Union[str, None] = None, caption: Union[str, None] = None, token: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"]), ): if token is not None: found_record = col_tokens.find_one({"token": token}) if found_record is None: raise SearchTokenInvalidError() return await photo_find( album=album, q=found_record["query"], caption=found_record["caption"], lat=found_record["lat"], lng=found_record["lng"], radius=found_record["radius"], page=found_record["page"], page_size=found_record["page_size"], current_user=current_user, ) if col_albums.find_one({"user": current_user.user, "name": album}) is None: raise AlbumNameNotFoundError(album) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} skip = (page - 1) * page_size radius = 5000 if radius is None else radius if (lat is not None) and (lng is not None): db_query = { "user": current_user.user, "album": album, "location": { "$nearSphere": { "$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius, } }, } db_query_count = { "user": current_user.user, "album": album, "location": {"$geoWithin": {"$centerSphere": [[lng, lat], radius]}}, } elif q is None and caption is None: raise PhotoSearchQueryEmptyError() elif q is None and caption is not None: db_query = { "user": current_user.user, "album": album, "caption": re.compile(caption), } db_query_count = { "user": current_user.user, "album": album, "caption": re.compile(caption), } elif q is not None and caption is None: db_query = { "user": current_user.user, "album": album, "filename": re.compile(q), } db_query_count = { "user": current_user.user, "album": album, "filename": re.compile(q), } else: db_query = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore images = list( col_photos.find(db_query, limit=page_size, skip=skip).sort( "dates.uploaded", DESCENDING ) ) for image in images: output["results"].append( { "id": image["_id"].__str__(), "filename": image["filename"], "caption": image["caption"], } ) if col_photos.count_documents(db_query_count) > page * page_size: token = str(token_urlsafe(32)) col_tokens.insert_one( { "token": token, "query": q, "caption": caption, "lat": lat, "lng": lng, "radius": radius, "page": page + 1, "page_size": page_size, } ) output["next_page"] = f"/albums/{album}/photos/?token={token}" # type: ignore else: output["next_page"] = None # type: ignore return UJSONResponse(output)