Formatted everything with black

This commit is contained in:
Profitroll 2023-03-12 14:59:13 +01:00
parent 47ae594079
commit f9df399682
19 changed files with 1024 additions and 455 deletions

View File

@ -3,51 +3,52 @@ from typing import Literal
class AlbumNotFoundError(Exception):
"""Raises HTTP 404 if no album with this ID found."""
def __init__(self, id: str):
self.id = id
self.openapi = {
"description": "Album Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find album with id '{id}'."
}
"example": {"detail": "Could not find album with id '{id}'."}
}
}
},
}
class AlbumNameNotFoundError(Exception):
"""Raises HTTP 404 if no album with this name found."""
def __init__(self, name: str):
self.name = name
self.openapi = {
"description": "Album Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find album with name '{name}'."
}
"example": {"detail": "Could not find album with name '{name}'."}
}
}
},
}
class AlbumAlreadyExistsError(Exception):
"""Raises HTTP 409 if album with this name already exists."""
def __init__(self, name: str):
self.name = name
self.openapi = {
"description": "Album Already Exists",
"content": {
"application/json": {
"example": {
"detail": "Album with name '{name}' already exists."
}
"example": {"detail": "Album with name '{name}' already exists."}
}
}
},
}
class AlbumIncorrectError(Exception):
"""Raises HTTP 406 if album's title or name is invalid."""
def __init__(self, place: Literal["name", "title"], error: str) -> None:
self.place = place
self.error = error
@ -55,30 +56,30 @@ class AlbumIncorrectError(Exception):
"description": "Album Name/Title Invalid",
"content": {
"application/json": {
"example": {
"detail": "Album {name/title} invalid: {error}"
}
"example": {"detail": "Album {name/title} invalid: {error}"}
}
}
},
}
class PhotoNotFoundError(Exception):
"""Raises HTTP 404 if no photo with this ID found."""
def __init__(self, id: str):
self.id = id
self.openapi = {
"description": "Photo Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find photo with id '{id}'."
}
"example": {"detail": "Could not find photo with id '{id}'."}
}
}
},
}
class PhotoSearchQueryEmptyError(Exception):
"""Raises HTTP 422 if no photo search query provided."""
def __init__(self):
self.openapi = {
"description": "Invalid Query",
@ -88,26 +89,28 @@ class PhotoSearchQueryEmptyError(Exception):
"detail": "You must provide query, caption or coordinates to look for photos."
}
}
}
},
}
class VideoNotFoundError(Exception):
"""Raises HTTP 404 if no video with this ID found."""
def __init__(self, id: str):
self.id = id
self.openapi = {
"description": "Video Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find video with id '{id}'."
}
"example": {"detail": "Could not find video with id '{id}'."}
}
}
},
}
class VideoSearchQueryEmptyError(Exception):
"""Raises HTTP 422 if no video search query provided."""
def __init__(self):
self.openapi = {
"description": "Invalid Query",
@ -117,11 +120,13 @@ class VideoSearchQueryEmptyError(Exception):
"detail": "You must provide query or caption to look for videos."
}
}
}
},
}
class SearchPageInvalidError(Exception):
"""Raises HTTP 400 if page or page size are not in valid range."""
def __init__(self):
self.openapi = {
"description": "Invalid Page",
@ -131,75 +136,69 @@ class SearchPageInvalidError(Exception):
"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."
}
}
}
},
}
class SearchTokenInvalidError(Exception):
"""Raises HTTP 401 if search token is not valid."""
def __init__(self):
self.openapi = {
"description": "Invalid Token",
"content": {
"application/json": {
"example": {
"detail": "Invalid search token."
}
}
}
"application/json": {"example": {"detail": "Invalid search token."}}
},
}
class UserEmailCodeInvalid(Exception):
"""Raises HTTP 400 if email confirmation code is not valid."""
def __init__(self):
self.openapi = {
"description": "Invalid Email Code",
"content": {
"application/json": {
"example": {
"detail": "Confirmation code is invalid."
}
"example": {"detail": "Confirmation code is invalid."}
}
}
},
}
class UserAlreadyExists(Exception):
"""Raises HTTP 409 if user with this name already exists."""
def __init__(self):
self.openapi = {
"description": "User Already Exists",
"content": {
"application/json": {
"example": {
"detail": "User with this username already exists."
}
"example": {"detail": "User with this username already exists."}
}
}
},
}
class AccessTokenInvalidError(Exception):
"""Raises HTTP 401 if access token is not valid."""
def __init__(self):
self.openapi = {
"description": "Invalid Access Token",
"content": {
"application/json": {
"example": {
"detail": "Invalid access token."
}
}
}
"application/json": {"example": {"detail": "Invalid access token."}}
},
}
class UserCredentialsInvalid(Exception):
"""Raises HTTP 401 if user credentials are not valid."""
def __init__(self):
self.openapi = {
"description": "Invalid Credentials",
"content": {
"application/json": {
"example": {
"detail": "Invalid credentials."
}
}
}
}
"application/json": {"example": {"detail": "Invalid credentials."}}
},
}

View File

@ -8,11 +8,13 @@ class Photo(BaseModel):
hash: str
filename: str
class PhotoPublic(BaseModel):
id: str
caption: str
filename: str
class PhotoSearch(BaseModel):
id: str
filename: str
@ -25,11 +27,13 @@ class Video(BaseModel):
hash: str
filename: str
class VideoPublic(BaseModel):
id: str
caption: str
filename: str
class VideoSearch(BaseModel):
id: str
filename: str
@ -41,11 +45,13 @@ class Album(BaseModel):
name: str
title: str
class AlbumSearch(BaseModel):
id: str
name: str
title: str
class AlbumModified(BaseModel):
name: str
title: str
@ -56,10 +62,12 @@ class SearchResultsAlbum(BaseModel):
results: List[Album]
next_page: Union[str, None]
class SearchResultsPhoto(BaseModel):
results: List[PhotoSearch]
next_page: Union[str, None]
class SearchResultsVideo(BaseModel):
results: List[VideoSearch]
next_page: Union[str, None]
next_page: Union[str, None]

View File

@ -2,69 +2,108 @@ import re
from os import makedirs, path, rename
from shutil import rmtree
from typing import Union
from classes.exceptions import AlbumAlreadyExistsError, AlbumIncorrectError, AlbumNotFoundError
from bson.errors import InvalidId
from bson.objectid import ObjectId
from fastapi import Security
from fastapi.responses import Response, UJSONResponse
from starlette.status import HTTP_204_NO_CONTENT
from classes.exceptions import (
AlbumAlreadyExistsError,
AlbumIncorrectError,
AlbumNotFoundError,
)
from classes.models import Album, AlbumModified, SearchResultsAlbum
from modules.app import app
from modules.database import col_photos, col_albums
from modules.database import col_albums, col_photos
from modules.security import User, get_current_active_user
from bson.objectid import ObjectId
from bson.errors import InvalidId
from fastapi import Security
from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT
album_create_responses = {
406: AlbumIncorrectError("name", "error").openapi,
409: AlbumAlreadyExistsError("name").openapi
409: AlbumAlreadyExistsError("name").openapi,
}
@app.post("/albums", description="Create album with name and title", response_class=UJSONResponse, response_model=Album, responses=album_create_responses)
async def album_create(name: str, title: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
@app.post(
"/albums",
description="Create album with name and title",
response_class=UJSONResponse,
response_model=Album,
responses=album_create_responses,
)
async def album_create(
name: str,
title: str,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if col_albums.find_one( {"name": name} ) is not None:
if col_albums.find_one({"name": name}) is not None:
raise AlbumAlreadyExistsError(name)
makedirs(path.join("data", "users", current_user.user, "albums", name), exist_ok=True)
uploaded = col_albums.insert_one( {"user": current_user.user, "name": name, "title": title, "cover": None} )
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"name": name,
"title": title
}
makedirs(
path.join("data", "users", current_user.user, "albums", name), exist_ok=True
)
@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum)
async def album_find(q: str, current_user: User = Security(get_current_active_user, scopes=["albums.list"])):
uploaded = col_albums.insert_one(
{"user": current_user.user, "name": name, "title": title, "cover": None}
)
return UJSONResponse(
{"id": uploaded.inserted_id.__str__(), "name": name, "title": title}
)
@app.get("/albums", description="Find album by name", response_model=SearchResultsAlbum)
async def album_find(
q: str,
current_user: User = Security(get_current_active_user, scopes=["albums.list"]),
):
output = {"results": []}
albums = list(col_albums.find( {"user": current_user.user, "name": re.compile(q)} ))
albums = list(col_albums.find({"user": current_user.user, "name": re.compile(q)}))
for album in albums:
output["results"].append( {"id": album["_id"].__str__(), "name": album["name"], "title": album["title"]} )
output["results"].append(
{
"id": album["_id"].__str__(),
"name": album["name"],
"title": album["title"],
}
)
return UJSONResponse(output)
album_patch_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi
406: AlbumIncorrectError("name", "error").openapi,
}
@app.patch("/albums/{id}", description="Modify album's name or title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_patch_responses)
async def album_patch(id: str, name: Union[str, None] = None, title: Union[str, None] = None, cover: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
@app.patch(
"/albums/{id}",
description="Modify album's name or title by id",
response_class=UJSONResponse,
response_model=AlbumModified,
responses=album_patch_responses,
)
async def album_patch(
id: str,
name: Union[str, None] = None,
title: Union[str, None] = None,
cover: Union[str, None] = None,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
album = col_albums.find_one({"_id": ObjectId(id)})
if album is None:
raise InvalidId(id)
except InvalidId:
@ -77,49 +116,64 @@ async def album_patch(id: str, name: Union[str, None] = None, title: Union[str,
title = album["title"]
if name is not None:
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError(
"name", "can only contain a-z, 0-9 and _ characters."
)
if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
path.join("data", "users", current_user.user, "albums", name),
)
col_photos.update_many(
{"user": current_user.user, "album": album["name"]},
{"$set": {"album": name}},
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
else:
name = album["name"]
if cover is not None:
image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} )
image = col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]})
cover = image["_id"].__str__() if image is not None else album["cover"]
else:
cover = album["cover"]
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} )
return UJSONResponse(
{
"name": name,
"title": title,
"cover": cover
}
col_albums.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}}
)
return UJSONResponse({"name": name, "title": title, "cover": cover})
album_put_responses = {
404: AlbumNotFoundError("id").openapi,
406: AlbumIncorrectError("name", "error").openapi
406: AlbumIncorrectError("name", "error").openapi,
}
@app.put("/albums/{id}", description="Modify album's name and title by id", response_class=UJSONResponse, response_model=AlbumModified, responses=album_put_responses)
async def album_put(id: str, name: str, title: str, cover: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
@app.put(
"/albums/{id}",
description="Modify album's name and title by id",
response_class=UJSONResponse,
response_model=AlbumModified,
responses=album_put_responses,
)
async def album_put(
id: str,
name: str,
title: str,
cover: str,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
try:
album = col_albums.find_one( {"_id": ObjectId(id)} )
album = col_albums.find_one({"_id": ObjectId(id)})
if album is None:
raise InvalidId(id)
except InvalidId:
raise AlbumNotFoundError(id)
if re.search(re.compile('^[a-z,0-9,_]*$'), name) is False:
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError("name", "can only contain a-z, 0-9 and _ characters.")
if 2 > len(name) > 20:
@ -128,40 +182,46 @@ async def album_put(id: str, name: str, title: str, cover: str, current_user: Us
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
image = col_photos.find_one( {"_id": ObjectId(cover), "album": album["name"]} )
cover = image["_id"].__str__() if image is not None else None # type: ignore
image = col_photos.find_one({"_id": ObjectId(cover), "album": album["name"]})
cover = image["_id"].__str__() if image is not None else None # type: ignore
rename(
path.join("data", "users", current_user.user, "albums", album["name"]),
path.join("data", "users", current_user.user, "albums", name)
path.join("data", "users", current_user.user, "albums", name),
)
col_photos.update_many( {"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}} )
col_albums.update_one( {"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}} )
return UJSONResponse(
{
"name": name,
"title": title,
"cover": cover
}
col_photos.update_many(
{"user": current_user.user, "album": album["name"]}, {"$set": {"album": name}}
)
col_albums.update_one(
{"_id": ObjectId(id)}, {"$set": {"name": name, "title": title, "cover": cover}}
)
album_delete_responses = {
404: AlbumNotFoundError("id").openapi
}
@app.delete("/album/{id}", description="Delete album by id", status_code=HTTP_204_NO_CONTENT, responses=album_delete_responses)
async def album_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["albums.write"])):
return UJSONResponse({"name": name, "title": title, "cover": cover})
album_delete_responses = {404: AlbumNotFoundError("id").openapi}
@app.delete(
"/album/{id}",
description="Delete album by id",
status_code=HTTP_204_NO_CONTENT,
responses=album_delete_responses,
)
async def album_delete(
id: str,
current_user: User = Security(get_current_active_user, scopes=["albums.write"]),
):
try:
album = col_albums.find_one_and_delete( {"_id": ObjectId(id)} )
album = col_albums.find_one_and_delete({"_id": ObjectId(id)})
if album is None:
raise InvalidId(id)
except InvalidId:
raise AlbumNotFoundError(id)
col_photos.delete_many( {"album": album["name"]} )
col_photos.delete_many({"album": album["name"]})
rmtree(path.join("data", "users", current_user.user, "albums", album["name"]))
return Response(status_code=HTTP_204_NO_CONTENT)
return Response(status_code=HTTP_204_NO_CONTENT)

View File

@ -1,8 +1,16 @@
from fastapi import Request
from fastapi import Request
from fastapi.responses import UJSONResponse
from modules.app import app
from classes.exceptions import *
from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY
from starlette.status import (
HTTP_400_BAD_REQUEST,
HTTP_401_UNAUTHORIZED,
HTTP_404_NOT_FOUND,
HTTP_406_NOT_ACCEPTABLE,
HTTP_409_CONFLICT,
HTTP_422_UNPROCESSABLE_ENTITY,
)
@app.exception_handler(AlbumNotFoundError)
async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError):
@ -11,13 +19,17 @@ async def album_not_found_exception_handler(request: Request, exc: AlbumNotFound
content={"detail": f"Could not find album with id '{exc.id}'."},
)
@app.exception_handler(AlbumAlreadyExistsError)
async def album_already_exists_exception_handler(request: Request, exc: AlbumAlreadyExistsError):
async def album_already_exists_exception_handler(
request: Request, exc: AlbumAlreadyExistsError
):
return UJSONResponse(
status_code=HTTP_409_CONFLICT,
content={"detail": f"Album with name '{exc.name}' already exists."},
)
@app.exception_handler(AlbumIncorrectError)
async def album_incorrect_exception_handler(request: Request, exc: AlbumIncorrectError):
return UJSONResponse(
@ -25,6 +37,7 @@ async def album_incorrect_exception_handler(request: Request, exc: AlbumIncorrec
content={"detail": f"Album {exc.place} invalid: {exc.error}"},
)
@app.exception_handler(PhotoNotFoundError)
async def photo_not_found_exception_handler(request: Request, exc: PhotoNotFoundError):
return UJSONResponse(
@ -32,13 +45,19 @@ async def photo_not_found_exception_handler(request: Request, exc: PhotoNotFound
content={"detail": f"Could not find photo with id '{exc.id}'."},
)
@app.exception_handler(PhotoSearchQueryEmptyError)
async def photo_search_query_empty_exception_handler(request: Request, exc: PhotoSearchQueryEmptyError):
async def photo_search_query_empty_exception_handler(
request: Request, exc: PhotoSearchQueryEmptyError
):
return UJSONResponse(
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": "You must provide query, caption or coordinates to look for photos."},
content={
"detail": "You must provide query, caption or coordinates to look for photos."
},
)
@app.exception_handler(VideoNotFoundError)
async def video_not_found_exception_handler(request: Request, exc: VideoNotFoundError):
return UJSONResponse(
@ -46,51 +65,78 @@ async def video_not_found_exception_handler(request: Request, exc: VideoNotFound
content={"detail": f"Could not find video with id '{exc.id}'."},
)
@app.exception_handler(VideoSearchQueryEmptyError)
async def video_search_query_empty_exception_handler(request: Request, exc: VideoSearchQueryEmptyError):
async def video_search_query_empty_exception_handler(
request: Request, exc: VideoSearchQueryEmptyError
):
return UJSONResponse(
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
content={"detail": "You must provide query, caption or coordinates to look for photos."},
content={
"detail": "You must provide query, caption or coordinates to look for photos."
},
)
@app.exception_handler(SearchPageInvalidError)
async def search_page_invalid_exception_handler(request: Request, exc: SearchPageInvalidError):
async def search_page_invalid_exception_handler(
request: Request, exc: SearchPageInvalidError
):
return UJSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."},
content={
"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."
},
)
@app.exception_handler(SearchTokenInvalidError)
async def search_token_invalid_exception_handler(request: Request, exc: SearchTokenInvalidError):
async def search_token_invalid_exception_handler(
request: Request, exc: SearchTokenInvalidError
):
return UJSONResponse(
status_code=HTTP_401_UNAUTHORIZED,
content={"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."},
content={
"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."
},
)
@app.exception_handler(UserEmailCodeInvalid)
async def user_email_code_invalid_exception_handler(request: Request, exc: UserEmailCodeInvalid):
async def user_email_code_invalid_exception_handler(
request: Request, exc: UserEmailCodeInvalid
):
return UJSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"detail": "Confirmation code is invalid."},
)
@app.exception_handler(UserAlreadyExists)
async def user_already_exists_exception_handler(request: Request, exc: UserAlreadyExists):
async def user_already_exists_exception_handler(
request: Request, exc: UserAlreadyExists
):
return UJSONResponse(
status_code=HTTP_409_CONFLICT,
content={"detail": "User with this username already exists."},
)
@app.exception_handler(AccessTokenInvalidError)
async def access_token_invalid_exception_handler(request: Request, exc: AccessTokenInvalidError):
async def access_token_invalid_exception_handler(
request: Request, exc: AccessTokenInvalidError
):
return UJSONResponse(
status_code=HTTP_401_UNAUTHORIZED,
content={"detail": "Invalid access token."},
)
@app.exception_handler(UserCredentialsInvalid)
async def user_credentials_invalid_exception_handler(request: Request, exc: UserCredentialsInvalid):
async def user_credentials_invalid_exception_handler(
request: Request, exc: UserCredentialsInvalid
):
return UJSONResponse(
status_code=HTTP_401_UNAUTHORIZED,
content={"detail": "Invalid credentials."},
)
)

View File

@ -2,26 +2,30 @@ from os import path
from modules.app import app
from fastapi.responses import HTMLResponse, Response
@app.get("/pages/matter.css", include_in_schema=False)
async def page_matter():
with open(path.join("pages", "matter.css"), "r", encoding="utf-8") as f:
output = f.read()
return Response(content=output)
@app.get("/pages/{page}/{file}", include_in_schema=False)
async def page_assets(page:str, file: str):
async def page_assets(page: str, file: str):
with open(path.join("pages", page, file), "r", encoding="utf-8") as f:
output = f.read()
return Response(content=output)
@app.get("/", include_in_schema=False)
async def page_home():
with open(path.join("pages", "home", "index.html"), "r", encoding="utf-8") as f:
output = f.read()
return HTMLResponse(content=output)
@app.get("/register", include_in_schema=False)
async def page_register():
with open(path.join("pages", "register", "index.html"), "r", encoding="utf-8") as f:
output = f.read()
return HTMLResponse(content=output)
return HTMLResponse(content=output)

View File

@ -10,12 +10,27 @@ from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system
from pydantic import ValidationError
from classes.exceptions import AccessTokenInvalidError, AlbumNameNotFoundError, PhotoNotFoundError, PhotoSearchQueryEmptyError, SearchPageInvalidError, SearchTokenInvalidError
from classes.exceptions import (
AccessTokenInvalidError,
AlbumNameNotFoundError,
PhotoNotFoundError,
PhotoSearchQueryEmptyError,
SearchPageInvalidError,
SearchTokenInvalidError,
)
from classes.models import Photo, PhotoPublic, SearchResultsPhoto
from modules.exif_reader import extract_location
from modules.hasher import get_phash, get_duplicates
from modules.scheduler import scheduler
from modules.security import ALGORITHM, SECRET_KEY, TokenData, User, create_access_token, get_current_active_user, get_user
from modules.security import (
ALGORITHM,
SECRET_KEY,
TokenData,
User,
create_access_token,
get_current_active_user,
get_user,
)
from modules.app import app
from modules.database import col_photos, col_albums, col_tokens
from pymongo import DESCENDING
@ -27,12 +42,16 @@ from jose import JWTError, jwt
from fastapi import UploadFile, Security
from fastapi.responses import UJSONResponse, Response
from fastapi.exceptions import HTTPException
from starlette.status import HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_409_CONFLICT
from starlette.status import (
HTTP_204_NO_CONTENT,
HTTP_401_UNAUTHORIZED,
HTTP_409_CONFLICT,
)
from modules.utils import configGet, logWrite
async def compress_image(image_path: str):
async def compress_image(image_path: str):
image_type = Magic(mime=True).from_file(image_path)
if image_type not in ["image/jpeg", "image/png"]:
@ -42,7 +61,10 @@ async def compress_image(image_path: str):
size_before = path.getsize(image_path) / 1024
if image_type == "image/jpeg":
task = Thread(target=system, kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'})
task = Thread(
target=system,
kwargs={"command": f'jpegoptim "{image_path}" -o --max=55 -p --strip-none'},
)
elif image_type == "image/png":
task = Thread(target=system, kwargs={"command": f'optipng -o3 "{image_path}"'})
else:
@ -53,7 +75,10 @@ async def compress_image(image_path: str):
task.join()
size_after = path.getsize(image_path) / 1024
logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb")
logWrite(
f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb"
)
photo_post_responses = {
404: AlbumNameNotFoundError("name").openapi,
@ -63,34 +88,56 @@ photo_post_responses = {
"application/json": {
"example": {
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": [
"string"
],
"access_token": "string"
"duplicates": ["string"],
"access_token": "string",
}
}
}
}
},
},
}
@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_responses)
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@app.post(
"/albums/{album}/photos",
description="Upload a photo to album",
response_class=UJSONResponse,
response_model=Photo,
responses=photo_post_responses,
)
async def photo_upload(
file: UploadFile,
album: str,
ignore_duplicates: bool = False,
compress: bool = True,
caption: Union[str, None] = None,
current_user: User = Security(get_current_active_user, scopes=["photos.write"]),
):
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
makedirs(path.join("data", "users", current_user.user, "albums", album), exist_ok=True)
makedirs(
path.join("data", "users", current_user.user, "albums", album), exist_ok=True
)
filename = file.filename
if path.exists(path.join("data", "users", current_user.user, "albums", album, file.filename)):
if path.exists(
path.join("data", "users", current_user.user, "albums", album, file.filename)
):
base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
filename = (
".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension
)
with open(path.join("data", "users", current_user.user, "albums", album, filename), "wb") as f:
with open(
path.join("data", "users", current_user.user, "albums", album, filename), "wb"
) as f:
f.write(await file.read())
file_hash = await get_phash(path.join("data", "users", current_user.user, "albums", album, filename))
file_hash = await get_phash(
path.join("data", "users", current_user.user, "albums", album, filename)
)
duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False:
@ -98,73 +145,90 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F
duplicates_ids = []
for entry in duplicates:
duplicates_ids.append(entry["id"])
access_token = create_access_token(data={"sub": current_user.user, "scopes": ["me", "photos.read"], "allowed": duplicates_ids}, expires_delta=timedelta(hours=configGet("media_token_valid_hours")))
access_token = create_access_token(
data={
"sub": current_user.user,
"scopes": ["me", "photos.read"],
"allowed": duplicates_ids,
},
expires_delta=timedelta(hours=configGet("media_token_valid_hours")),
)
access_token_short = uuid4().hex[:12].lower()
col_tokens.insert_one({"short": access_token_short, "access_token": access_token, "photos": duplicates_ids})
col_tokens.insert_one(
{
"short": access_token_short,
"access_token": access_token,
"photos": duplicates_ids,
}
)
else:
access_token_short = None
return UJSONResponse(
{
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": duplicates,
"access_token": access_token_short
"access_token": access_token_short,
},
status_code=HTTP_409_CONFLICT
status_code=HTTP_409_CONFLICT,
)
try:
coords = extract_location(path.join("data", "users", current_user.user, "albums", album, filename))
coords = extract_location(
path.join("data", "users", current_user.user, "albums", album, filename)
)
except (UnpackError, ValueError):
coords = {
"lng": 0.0,
"lat": 0.0,
"alt": 0.0
}
coords = {"lng": 0.0, "lat": 0.0, "alt": 0.0}
uploaded = col_photos.insert_one(
{
"user": current_user.user,
"album": album,
"album": album,
"hash": file_hash,
"filename": filename,
"dates": {
"uploaded": datetime.now(tz=timezone.utc),
"modified": datetime.now(tz=timezone.utc)
"modified": datetime.now(tz=timezone.utc),
},
"location": [
coords["lng"],
coords["lat"],
coords["alt"]
],
"caption": caption
"location": [coords["lng"], coords["lat"], coords["alt"]],
"caption": caption,
}
)
if compress is True:
scheduler.add_job(compress_image, trigger="date", run_date=datetime.now()+timedelta(seconds=1), args=[path.join("data", "users", current_user.user, "albums", album, filename)])
scheduler.add_job(
compress_image,
trigger="date",
run_date=datetime.now() + timedelta(seconds=1),
args=[
path.join("data", "users", current_user.user, "albums", album, filename)
],
)
return UJSONResponse(
{
"id": uploaded.inserted_id.__str__(),
"album": album,
"hash": file_hash,
"filename": filename
"filename": filename,
}
)
# Access to photos y token generated for example by
# upload method when duplicates are found. Is disabled
# by default and should remain so if not really needed.
if configGet("media_token_access") is True:
photo_get_token_responses = {
401: AccessTokenInvalidError().openapi,
404: PhotoNotFoundError("id").openapi
404: PhotoNotFoundError("id").openapi,
}
@app.get("/token/photo/{token}", description="Get a photo by its duplicate token", responses=photo_get_token_responses)
@app.get(
"/token/photo/{token}",
description="Get a photo by its duplicate token",
responses=photo_get_token_responses,
)
async def photo_get_token(token: str, id: int):
db_entry = col_tokens.find_one({"short": token})
if db_entry is None:
@ -183,188 +247,335 @@ if configGet("media_token_access") is True:
except (JWTError, ValidationError) as exp:
print(exp, flush=True)
raise AccessTokenInvalidError()
user = get_user(user=token_data.user)
if id not in payload.get("allowed", []):
raise AccessTokenInvalidError()
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
image = col_photos.find_one({"_id": ObjectId(id)})
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
image_path = path.join("data", "users", user.user, "albums", image["album"], image["filename"])
image_path = path.join(
"data", "users", user.user, "albums", image["album"], image["filename"]
)
mime = Magic(mime=True).from_file(image_path)
with open(image_path, "rb") as f: image_file = f.read()
with open(image_path, "rb") as f:
image_file = f.read()
return Response(image_file, media_type=mime)
photo_get_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses)
async def photo_get(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.read"])):
photo_get_responses = {404: PhotoNotFoundError("id").openapi}
@app.get("/photos/{id}", description="Get a photo by id", responses=photo_get_responses)
async def photo_get(
id: str,
current_user: User = Security(get_current_active_user, scopes=["photos.read"]),
):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
image = col_photos.find_one({"_id": ObjectId(id)})
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])
image_path = path.join(
"data", "users", current_user.user, "albums", image["album"], image["filename"]
)
mime = Magic(mime=True).from_file(image_path)
with open(image_path, "rb") as f: image_file = f.read()
with open(image_path, "rb") as f:
image_file = f.read()
return Response(image_file, media_type=mime)
photo_move_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.put("/photos/{id}", description="Move a photo to another album", response_model=PhotoPublic, responses=photo_move_responses)
async def photo_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
photo_move_responses = {404: PhotoNotFoundError("id").openapi}
@app.put(
"/photos/{id}",
description="Move a photo to another album",
response_model=PhotoPublic,
responses=photo_move_responses,
)
async def photo_move(
id: str,
album: str,
current_user: User = Security(get_current_active_user, scopes=["photos.write"]),
):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
image = col_photos.find_one({"_id": ObjectId(id)})
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album)
if path.exists(path.join("data", "users", current_user.user, "albums", album, image["filename"])):
if path.exists(
path.join(
"data", "users", current_user.user, "albums", album, image["filename"]
)
):
base_name = image["filename"].split(".")[:-1]
extension = image["filename"].split(".")[-1]
filename = ".".join(base_name)+f"_{int(datetime.now().timestamp())}."+extension
filename = (
".".join(base_name) + f"_{int(datetime.now().timestamp())}." + extension
)
else:
filename = image["filename"]
col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"album": album, "filename": filename, "dates.modified": datetime.now(tz=timezone.utc)}} )
col_photos.find_one_and_update(
{"_id": ObjectId(id)},
{
"$set": {
"album": album,
"filename": filename,
"dates.modified": datetime.now(tz=timezone.utc),
}
},
)
move(
path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]),
path.join("data", "users", current_user.user, "albums", album, filename)
path.join(
"data",
"users",
current_user.user,
"albums",
image["album"],
image["filename"],
),
path.join("data", "users", current_user.user, "albums", album, filename),
)
return UJSONResponse(
{
"id": image["_id"].__str__(),
"caption": image["caption"],
"filename": filename
"filename": filename,
}
)
photo_patch_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.patch("/photos/{id}", description="Change properties of a photo", response_model=PhotoPublic, responses=photo_patch_responses)
async def photo_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
photo_patch_responses = {404: PhotoNotFoundError("id").openapi}
@app.patch(
"/photos/{id}",
description="Change properties of a photo",
response_model=PhotoPublic,
responses=photo_patch_responses,
)
async def photo_patch(
id: str,
caption: str,
current_user: User = Security(get_current_active_user, scopes=["photos.write"]),
):
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
image = col_photos.find_one({"_id": ObjectId(id)})
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} )
col_photos.find_one_and_update(
{"_id": ObjectId(id)},
{"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}},
)
return UJSONResponse(
{
"id": image["_id"].__str__(),
"caption": caption,
"filename": image["filename"]
"filename": image["filename"],
}
)
photo_delete_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.delete("/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT, responses=photo_delete_responses)
async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
photo_delete_responses = {404: PhotoNotFoundError("id").openapi}
@app.delete(
"/photos/{id}",
description="Delete a photo by id",
status_code=HTTP_204_NO_CONTENT,
responses=photo_delete_responses,
)
async def photo_delete(
id: str,
current_user: User = Security(get_current_active_user, scopes=["photos.write"]),
):
try:
image = col_photos.find_one_and_delete( {"_id": ObjectId(id)} )
image = col_photos.find_one_and_delete({"_id": ObjectId(id)})
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
album = col_albums.find_one( {"name": image["album"]} )
album = col_albums.find_one({"name": image["album"]})
if album is not None and album["cover"] == image["_id"].__str__():
col_albums.update_one( {"name": image["album"]}, {"$set": {"cover": None}} )
col_albums.update_one({"name": image["album"]}, {"$set": {"cover": None}})
remove(path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]))
remove(
path.join(
"data",
"users",
current_user.user,
"albums",
image["album"],
image["filename"],
)
)
return Response(status_code=HTTP_204_NO_CONTENT)
photo_find_responses = {
400: SearchPageInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi,
422: PhotoSearchQueryEmptyError().openapi
422: PhotoSearchQueryEmptyError().openapi,
}
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_responses)
async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
@app.get(
"/albums/{album}/photos",
description="Find a photo by filename",
response_class=UJSONResponse,
response_model=SearchResultsPhoto,
responses=photo_find_responses,
)
async def photo_find(
album: str,
q: Union[str, None] = None,
caption: Union[str, None] = None,
page: int = 1,
page_size: int = 100,
lat: Union[float, None] = None,
lng: Union[float, None] = None,