import re import pickle from secrets import token_urlsafe from shutil import move from threading import Thread from typing import List, Union from magic import Magic from datetime import datetime, timedelta, timezone from os import makedirs, path, remove, system from classes.models import Photo, SearchResults from modules.exif_reader import extract_location from modules.hasher import get_phash, get_duplicates from modules.scheduler import scheduler from modules.security import User, get_current_active_user from modules.app import app from modules.database import col_photos, col_albums, col_tokens from pymongo import DESCENDING from bson.objectid import ObjectId from bson.errors import InvalidId from plum.exceptions import UnpackError from fastapi import HTTPException, UploadFile, Security from fastapi.responses import UJSONResponse, Response from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY from modules.utils import logWrite async def compress_image(image_path: str): image_type = Magic(mime=True).from_file(image_path) if image_type not in ["image/jpeg", "image/png"]: logWrite(f"Not compressing {image_path} because its mime is '{image_type}'") return size_before = path.getsize(image_path) / 1024 # system(f"exiftool -overwrite_original -all:all= -tagsFromFile @ -exif:Orientation {image_path}") if image_type == "image/jpeg": task = Thread(target=system, kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'}) elif image_type == "image/png": task = Thread(target=system, kwargs={"command": f'optipng -o3 "{image_path}"'}) else: return task.start() logWrite(f"Compressing '{path.split(image_path)[-1]}'...") task.join() size_after = path.getsize(image_path) / 1024 logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb") @app.post("/albums/{album}/photos", response_class=UJSONResponse, response_model=Photo, description="Upload a photo to album") async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") # if not file.content_type.startswith("image"): # raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.") makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True) filename = file.filename if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)): base_name = file.filename.split(".")[:-1] extension = file.filename.split(".")[-1] filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f: f.write(await file.read()) file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename)) duplicates = await get_duplicates(file_hash, album) if len(duplicates) > 0 and ignore_duplicates is False: return UJSONResponse( { "detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.", "duplicates": duplicates }, status_code=HTTP_409_CONFLICT ) try: coords = extract_location(path.join("data", "users", current_user.user, "albums", album, filename)) except (UnpackError, ValueError): coords = { "lng": 0.0, "lat": 0.0, "alt": 0.0 } uploaded = col_photos.insert_one( { "user": current_user.user, "album": album, "hash": file_hash, "filename": filename, "dates": { "uploaded": datetime.now(tz=timezone.utc), "modified": datetime.now(tz=timezone.utc) }, "location": [ coords["lng"], coords["lat"], coords["alt"] ], "caption": caption } ) if compress is True: scheduler.add_job(compress_image, trigger="date", run_date=datetime.now()+timedelta(seconds=1), args=[path.join("data", "users", current_user.user, "albums", album, filename)]) return UJSONResponse( { "id": uploaded.inserted_id.__str__(), "album": album, "hash": file_hash, "filename": filename } ) @app.get("/photos/{id}", description="Get a photo by id") async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"])): try: image = col_photos.find_one( {"_id": ObjectId(id)} ) if image is None: raise InvalidId(id) except InvalidId: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]) mime = Magic(mime=True).from_file(image_path) with open(image_path, "rb") as f: image_file = f.read() return Response(image_file, media_type=mime) @app.put("/photos/{id}", description="Move a photo into another album") async def photo_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): try: image = col_photos.find_one( {"_id": ObjectId(id)} ) if image is None: raise InvalidId(id) except InvalidId: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") if path.exists(path.join("data", "users", current_user.user, "albums", album, image["filename"])): base_name = image["filename"].split(".")[:-1] extension = image["filename"].split(".")[-1] filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension else: filename = image["filename"] col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} ) move( path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]), path.join("data", "users", current_user.user, "albums", album, filename) ) return UJSONResponse( { "id": image["_id"].__str__(), "filename": filename } ) @app.patch("/photos/{id}", description="Change properties of a photo") async def photo_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): try: image = col_photos.find_one( {"_id": ObjectId(id)} ) if image is None: raise InvalidId(id) except InvalidId: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} ) return UJSONResponse( { "id": image["_id"].__str__(), "caption": caption } ) @app.delete("/photos/{id}", description="Delete a photo by id") async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): try: image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} ) if image is None: raise InvalidId(id) except InvalidId: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") album = col_albums.find_one( {"name": image["album"]} ) if album is not None and album["cover"] == image["_id"].__str__(): col_albums.update_one( {"name": image["album"]}, {"$set": {"cover": None}} ) remove(path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])) return Response(status_code=HTTP_204_NO_CONTENT) @app.get("/albums/{album}/photos", response_class=UJSONResponse, response_model=SearchResults, description="Find a photo by filename") async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])): if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") if page <= 0 or page_size <= 0: raise HTTPException(status_code=HTTP_400_BAD_REQUEST, detail="Parameters 'page' and 'page_size' must be greater or equal to 1.") output = {"results": []} skip = (page-1)*page_size radius = 5000 if radius is None else radius if (lat is not None) and (lng is not None): db_query = {"user": current_user.user, "album": album, "location": { "$nearSphere": {"$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius} } } db_query_count = {"user": current_user.user, "album": album, "location": { "$geoWithin": { "$centerSphere": [ [lng, lat], radius ] } } } elif q is None and caption is None: raise HTTPException(status_code=HTTP_422_UNPROCESSABLE_ENTITY, detail="You must provide query, caption or coordinates to look for photos") elif q is None and caption is not None: db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)} db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)} elif q is not None and caption is None: db_query = {"user": current_user.user, "album": album, "filename": re.compile(q)} db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q)} else: db_query = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore images = list(col_photos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING)) for image in images: output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"]}) if col_photos.count_documents( db_query_count ) > page*page_size: token = str(token_urlsafe(32)) col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} ) output["next_page"] = f"/albums/{album}/photos/token?token={token}" # type: ignore else: output["next_page"] = None # type: ignore return UJSONResponse(output) @app.get("/albums/{album}/photos/token", response_class=UJSONResponse, response_model=SearchResults, description="Find a photo by token") async def photo_find_token(token: str): found_record = col_tokens.find_one( {"token": token} ) if found_record is None: raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid search token.") return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))