PhotosAPI/extensions/photos.py
2023-02-18 00:47:00 +01:00

361 lines
14 KiB
Python

import re
import pickle
from secrets import token_urlsafe
from shutil import move
from threading import Thread
from typing import Union
from uuid import uuid4
from magic import Magic
from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system
from pydantic import ValidationError
from classes.exceptions import AccessTokenInvalidError, AlbumNameNotFoundError, PhotoNotFoundError, PhotoSearchQueryEmptyError, SearchPageInvalidError, SearchTokenInvalidError
from classes.models import Photo, PhotoPublic, SearchResultsPhoto
from modules.exif_reader import extract_location
from modules.hasher import get_phash, get_duplicates
from modules.scheduler import scheduler
from modules.security import ALGORITHM, SECRET_KEY, TokenData, User, create_access_token, get_current_active_user, get_user
from modules.app import app
from modules.database import col_photos, col_albums, col_tokens
from pymongo import DESCENDING
from bson.objectid import ObjectId
from bson.errors import InvalidId
from plum.exceptions import UnpackError
from jose import JWTError, jwt
from fastapi import UploadFile, Security
from fastapi.responses import UJSONResponse, Response
from fastapi.exceptions import HTTPException
from starlette.status import HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_409_CONFLICT
from modules.utils import logWrite
async def compress_image(image_path: str):
image_type = Magic(mime=True).from_file(image_path)
if image_type not in ["image/jpeg", "image/png"]:
logWrite(f"Not compressing {image_path} because its mime is '{image_type}'")
return
size_before = path.getsize(image_path) / 1024
if image_type == "image/jpeg":
task = Thread(target=system, kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'})
elif image_type == "image/png":
task = Thread(target=system, kwargs={"command": f'optipng -o3 "{image_path}"'})
else:
return
task.start()
logWrite(f"Compressing '{path.split(image_path)[-1]}'...")
task.join()
size_after = path.getsize(image_path) / 1024
logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb")
photo_post_responses = {
404: AlbumNameNotFoundError("name").openapi,
409: {
"description": "Image Duplicates Found",
"content": {
"application/json": {
"example": {
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": [
"string"
],
"access_token": "string"
}
}
}
}
}
@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_responses)
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise AlbumNameNotFoundError(album)
makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True)
filename = file.filename
if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)):
base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f:
f.write(await file.read())
file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename))
duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False:
duplicates_ids = []
for entry in duplicates:
duplicates_ids.append(entry["id"])
access_token = create_access_token(data={"sub": current_user.user, "scopes": ["me", "photos.read"], "allowed": duplicates_ids}, expires_delta=timedelta(hours=1))
access_token_short = uuid4().hex[:12].lower()
col_tokens.insert_one({"short": access_token_short, "access_token": access_token, "photos": duplicates_ids})
return UJSONResponse(
{
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": duplicates,
"access_token": access_token_short
},
status_code=HTTP_409_CONFLICT
)
try:
coords = extract_location(path.join("data", "users", current_user.user, "albums", album, filename))
except (UnpackError, ValueError):
coords = {
"lng": 0.0,
"lat": 0.0,
"alt": 0.0
}
uploaded = col_photos.insert_one(
{
"user": current_user.user,
"album": album,
"hash": file_hash,
"filename": filename,
"dates": {
"uploaded": datetime.now(tz=timezone.utc),
"modified": datetime.now(tz=timezone.utc)
},
"location": [
coords["lng"],
coords["lat"],
coords["alt"]
],
"caption": caption
}
)
if compress is True:
scheduler.add_job(compress_image, trigger="date", run_date=datetime.now()+timedelta(seconds=1), args=[path.join("data", "users", current_user.user, "albums", album, filename)])
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"album": album,
"hash": file_hash,
"filename": filename
}
)
photo_get_token_responses = {
401: AccessTokenInvalidError().openapi,
404: PhotoNotFoundError("id").openapi
}
@app.get("/token/photo/{token}", description="Get a photo by id", responses=photo_get_token_responses)
async def photo_get_token(token: str, id: int):
db_entry = col_tokens.find_one({"short": token})
if db_entry is None:
raise AccessTokenInvalidError()
token = db_entry["access_token"]
id = db_entry["photos"][id]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user: str = payload.get("sub")
if user is None:
raise AccessTokenInvalidError()
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, user=user)
except (JWTError, ValidationError) as exp:
print(exp, flush=True)
raise AccessTokenInvalidError()
user = get_user(user=token_data.user)
if id not in payload.get("allowed", []):
raise AccessTokenInvalidError()
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
image_path = path.join("data", "users", user.user, "albums", image["album"], image["filename"])
mime = Magic(mime=True).from_file(image_path)
with open(image_path, "rb") as f: image_file = f.read()
return Response(image_file, media_type=mime)
photo_get_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses)
async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"])):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])
mime = Magic(mime=True).from_file(image_path)
with open(image_path, "rb") as f: image_file = f.read()
return Response(image_file, media_type=mime)
photo_move_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.put("/photos/{id}", description="Move a photo to another album", response_model=PhotoPublic, responses=photo_move_responses)
async def photo_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise AlbumNameNotFoundError(album)
if path.exists(path.join("data", "users", current_user.user, "albums", album, image["filename"])):
base_name = image["filename"].split(".")[:-1]
extension = image["filename"].split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
else:
filename = image["filename"]
col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} )
move(
path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]),
path.join("data", "users", current_user.user, "albums", album, filename)
)
return UJSONResponse(
{
"id": image["_id"].__str__(),
"caption": image["caption"],
"filename": filename
}
)
photo_patch_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.patch("/photos/{id}", description="Change properties of a photo", response_model=PhotoPublic, responses=photo_patch_responses)
async def photo_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} )
return UJSONResponse(
{
"id": image["_id"].__str__(),
"caption": caption,
"filename": image["filename"]
}
)
photo_delete_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.delete("/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT, responses=photo_delete_responses)
async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try:
image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
album = col_albums.find_one( {"name": image["album"]} )
if album is not None and album["cover"] == image["_id"].__str__():
col_albums.update_one( {"name": image["album"]}, {"$set": {"cover": None}} )
remove(path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]))
return Response(status_code=HTTP_204_NO_CONTENT)
photo_find_responses = {
400: SearchPageInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
422: PhotoSearchQueryEmptyError().openapi
}
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_responses)
async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise AlbumNameNotFoundError(album)
if page <= 0 or page_size <= 0:
raise SearchPageInvalidError()
output = {"results": []}
skip = (page-1)*page_size
radius = 5000 if radius is None else radius
if (lat is not None) and (lng is not None):
db_query = {"user": current_user.user, "album": album, "location": { "$nearSphere": {"$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius} } }
db_query_count = {"user": current_user.user, "album": album, "location": { "$geoWithin": { "$centerSphere": [ [lng, lat], radius ] } } }
elif q is None and caption is None:
raise PhotoSearchQueryEmptyError()
elif q is None and caption is not None:
db_query = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
db_query_count = {"user": current_user.user, "album": album, "caption": re.compile(caption)}
elif q is not None and caption is None:
db_query = {"user": current_user.user, "album": album, "filename": re.compile(q)}
db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q)}
else:
db_query = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore
db_query_count = {"user": current_user.user, "album": album, "filename": re.compile(q), "caption": re.compile(caption)} # type: ignore
images = list(col_photos.find(db_query, limit=page_size, skip=skip).sort('dates.uploaded', DESCENDING))
for image in images:
output["results"].append({"id": image["_id"].__str__(), "filename": image["filename"], "caption": image["caption"]})
if col_photos.count_documents( db_query_count ) > page*page_size:
token = str(token_urlsafe(32))
col_tokens.insert_one( {"token": token, "query": q, "album": album, "page": page+1, "page_size": page_size, "user": pickle.dumps(current_user)} )
output["next_page"] = f"/albums/{album}/photos/token?token={token}" # type: ignore
else:
output["next_page"] = None # type: ignore
return UJSONResponse(output)
photo_find_token_responses = {
401: SearchTokenInvalidError().openapi
}
@app.get("/albums/{album}/photos/token", description="Find a photo by token", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_token_responses)
async def photo_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} )
if found_record is None:
raise SearchTokenInvalidError()
return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))