4 Commits

Author SHA1 Message Date
fa3aca30c2 Removed unused imports 2023-02-16 14:56:28 +01:00
c353a4a4df WIP: Better error handling 2023-02-16 14:55:03 +01:00
3bae6ef40e Removed unused messages 2023-02-16 14:54:49 +01:00
0722a26fb3 Removed deprecated shit 2023-02-16 14:54:29 +01:00
8 changed files with 227 additions and 110 deletions

View File

@@ -1,16 +1,29 @@
from typing import Literal from typing import Literal
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
class AlbumNotFoundError(Exception): class AlbumNotFoundError(Exception):
def __init__(self, id: str): def __init__(self, id: str):
self.id = id self.id = id
self.openapi = { self.openapi = {
"description": "Album does not exist", "description": "Album Does Not Exist",
"content": { "content": {
"application/json": { "application/json": {
"example": { "example": {
"detail": "Could not find an album with id '{id}'." "detail": "Could not find album with id '{id}'."
}
}
}
}
class AlbumNameNotFoundError(Exception):
def __init__(self, name: str):
self.name = name
self.openapi = {
"description": "Album Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find album with name '{name}'."
} }
} }
} }
@@ -20,7 +33,7 @@ class AlbumAlreadyExistsError(Exception):
def __init__(self, name: str): def __init__(self, name: str):
self.name = name self.name = name
self.openapi = { self.openapi = {
"description": "Album already exists", "description": "Album Already Exists",
"content": { "content": {
"application/json": { "application/json": {
"example": { "example": {
@@ -35,7 +48,7 @@ class AlbumIncorrectError(Exception):
self.place = place self.place = place
self.error = error self.error = error
self.openapi = { self.openapi = {
"description": "Album name/title invalid", "description": "Album Name/Title Invalid",
"content": { "content": {
"application/json": { "application/json": {
"example": { "example": {
@@ -43,4 +56,83 @@ class AlbumIncorrectError(Exception):
} }
} }
} }
}
class PhotoNotFoundError(Exception):
def __init__(self, id: str):
self.id = id
self.openapi = {
"description": "Photo Does Not Exist",
"content": {
"application/json": {
"example": {
"detail": "Could not find photo with id '{id}'."
}
}
}
}
class SearchPageInvalidError(Exception):
def __init__(self):
self.openapi = {
"description": "Invalid Page",
"content": {
"application/json": {
"example": {
"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."
}
}
}
}
class SearchTokenInvalidError(Exception):
def __init__(self):
self.openapi = {
"description": "Invalid Token",
"content": {
"application/json": {
"example": {
"detail": "Invalid search token."
}
}
}
}
class UserEmailCodeInvalid(Exception):
def __init__(self):
self.openapi = {
"description": "Invalid Email Code",
"content": {
"application/json": {
"example": {
"detail": "Confirmation code is invalid."
}
}
}
}
class UserAlreadyExists(Exception):
def __init__(self):
self.openapi = {
"description": "User Already Exists",
"content": {
"application/json": {
"example": {
"detail": "User with this username already exists."
}
}
}
}
class UserCredentialsInvalid(Exception):
def __init__(self):
self.openapi = {
"description": "Invalid Credentials",
"content": {
"application/json": {
"example": {
"detail": "Invalid credentials."
}
}
}
} }

View File

@@ -1,4 +1,4 @@
from typing import Dict, List, Literal, Union from typing import List, Union
from pydantic import BaseModel from pydantic import BaseModel
class Photo(BaseModel): class Photo(BaseModel):
@@ -7,6 +7,11 @@ class Photo(BaseModel):
hash: str hash: str
filename: str filename: str
class PhotoPublic(BaseModel):
id: str
caption: str
filename: str
class PhotoSearch(BaseModel): class PhotoSearch(BaseModel):
id: str id: str
filename: str filename: str

View File

@@ -7,15 +7,7 @@
"password": null "password": null
}, },
"messages": { "messages": {
"key_expired": "API key expired", "email_confirmed": "Email confirmed. You can now log in."
"key_invalid": "Invalid API key",
"key_valid": "Valid API key",
"bad_request": "Bad request. Read the docs at photos.end-play.xyz/docs",
"ip_blacklisted": "Your IP is blacklisted. Make sure you are using correct API address.",
"credentials_invalid": "Incorrect user or password",
"user_already_exists": "User with this username already exists.",
"email_confirmed": "Email confirmed. You can now log in.",
"email_code_invalid": "Confirmation code is invalid."
}, },
"external_address": "localhost", "external_address": "localhost",
"registration_enabled": true, "registration_enabled": true,

View File

@@ -1,12 +1,12 @@
from modules.app import app from modules.app import app
from classes.exceptions import * from classes.exceptions import *
from starlette.status import HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT from starlette.status import HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE, HTTP_409_CONFLICT
@app.exception_handler(AlbumNotFoundError) @app.exception_handler(AlbumNotFoundError)
async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError): async def album_not_found_exception_handler(request: Request, exc: AlbumNotFoundError):
return JSONResponse( return JSONResponse(
status_code=HTTP_404_NOT_FOUND, status_code=HTTP_404_NOT_FOUND,
content={"detail": f"Could not find an album with id '{exc.id}'."}, content={"detail": f"Could not find album with id '{exc.id}'."},
) )
@app.exception_handler(AlbumAlreadyExistsError) @app.exception_handler(AlbumAlreadyExistsError)
@@ -21,4 +21,46 @@ async def album_incorrect_exception_handler(request: Request, exc: AlbumIncorrec
return JSONResponse( return JSONResponse(
status_code=HTTP_406_NOT_ACCEPTABLE, status_code=HTTP_406_NOT_ACCEPTABLE,
content={"detail": f"Album {exc.place} invalid: {exc.error}"}, content={"detail": f"Album {exc.place} invalid: {exc.error}"},
)
@app.exception_handler(PhotoNotFoundError)
async def photo_not_found_exception_handler(request: Request, exc: PhotoNotFoundError):
return JSONResponse(
status_code=HTTP_404_NOT_FOUND,
content={"detail": f"Could not find photo with id '{exc.id}'."},
)
@app.exception_handler(SearchPageInvalidError)
async def search_page_invalid_exception_handler(request: Request, exc: SearchPageInvalidError):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."},
)
@app.exception_handler(SearchTokenInvalidError)
async def search_token_invalid_exception_handler(request: Request, exc: SearchTokenInvalidError):
return JSONResponse(
status_code=HTTP_401_UNAUTHORIZED,
content={"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."},
)
@app.exception_handler(UserEmailCodeInvalid)
async def user_email_code_invalid_exception_handler(request: Request, exc: UserEmailCodeInvalid):
return JSONResponse(
status_code=HTTP_400_BAD_REQUEST,
content={"detail": "Confirmation code is invalid."},
)
@app.exception_handler(UserAlreadyExists)
async def user_already_exists_exception_handler(request: Request, exc: UserAlreadyExists):
return JSONResponse(
status_code=HTTP_409_CONFLICT,
content={"detail": "User with this username already exists."},
)
@app.exception_handler(UserCredentialsInvalid)
async def user_credentials_invalid_exception_handler(request: Request, exc: UserCredentialsInvalid):
return JSONResponse(
status_code=HTTP_401_UNAUTHORIZED,
content={"detail": "Invalid credentials."},
) )

View File

@@ -3,11 +3,12 @@ import pickle
from secrets import token_urlsafe from secrets import token_urlsafe
from shutil import move from shutil import move
from threading import Thread from threading import Thread
from typing import List, Union from typing import Union
from magic import Magic from magic import Magic
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system from os import makedirs, path, remove, system
from classes.models import Photo, SearchResultsPhoto from classes.exceptions import AlbumNameNotFoundError, PhotoNotFoundError, SearchPageInvalidError, SearchTokenInvalidError
from classes.models import Photo, PhotoPublic, SearchResultsPhoto
from modules.exif_reader import extract_location from modules.exif_reader import extract_location
from modules.hasher import get_phash, get_duplicates from modules.hasher import get_phash, get_duplicates
from modules.scheduler import scheduler from modules.scheduler import scheduler
@@ -21,7 +22,7 @@ from plum.exceptions import UnpackError
from fastapi import HTTPException, UploadFile, Security from fastapi import HTTPException, UploadFile, Security
from fastapi.responses import UJSONResponse, Response from fastapi.responses import UJSONResponse, Response
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_404_NOT_FOUND, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY from starlette.status import HTTP_204_NO_CONTENT, HTTP_409_CONFLICT, HTTP_422_UNPROCESSABLE_ENTITY
from modules.utils import logWrite from modules.utils import logWrite
@@ -51,11 +52,28 @@ async def compress_image(image_path: str):
size_after = path.getsize(image_path) / 1024 size_after = path.getsize(image_path) / 1024
logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb") logWrite(f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb")
@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo) photo_post_reponses = {
404: AlbumNameNotFoundError("name").openapi,
409: {
"description": "Image Duplicates Found",
"content": {
"application/json": {
"example": {
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
"duplicates": [
"string"
]
}
}
}
}
}
@app.post("/albums/{album}/photos", description="Upload a photo to album", response_class=UJSONResponse, response_model=Photo, responses=photo_post_reponses)
async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = False, compress: bool = True, caption: Union[str, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") raise AlbumNameNotFoundError(album)
# raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.")
# if not file.content_type.startswith("image"): # if not file.content_type.startswith("image"):
# raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.") # raise HTTPException(status_code=HTTP_406_NOT_ACCEPTABLE, detail="Provided file is not an image, not accepting.")
@@ -132,7 +150,7 @@ async def photo_get(id: str, current_user: User = Security(get_current_active_us
if image is None: if image is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") raise PhotoNotFoundError(id)
image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"]) image_path = path.join("data", "users", current_user.user, "albums", image["album"], image["filename"])
@@ -142,7 +160,10 @@ async def photo_get(id: str, current_user: User = Security(get_current_active_us
return Response(image_file, media_type=mime) return Response(image_file, media_type=mime)
@app.put("/photos/{id}", description="Move a photo into another album") photo_move_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.put("/photos/{id}", description="Move a photo to another album", response_model=PhotoPublic, responses=photo_move_responses)
async def photo_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): async def photo_move(id: str, album: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try: try:
@@ -150,10 +171,10 @@ async def photo_move(id: str, album: str, current_user: User = Security(get_curr
if image is None: if image is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") raise PhotoNotFoundError(id)
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") raise AlbumNameNotFoundError(album)
if path.exists(path.join("data", "users", current_user.user, "albums", album, image["filename"])): if path.exists(path.join("data", "users", current_user.user, "albums", album, image["filename"])):
base_name = image["filename"].split(".")[:-1] base_name = image["filename"].split(".")[:-1]
@@ -172,11 +193,15 @@ async def photo_move(id: str, album: str, current_user: User = Security(get_curr
return UJSONResponse( return UJSONResponse(
{ {
"id": image["_id"].__str__(), "id": image["_id"].__str__(),
"caption": image["caption"],
"filename": filename "filename": filename
} }
) )
@app.patch("/photos/{id}", description="Change properties of a photo") photo_patch_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.patch("/photos/{id}", description="Change properties of a photo", response_model=PhotoPublic, responses=photo_patch_responses)
async def photo_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): async def photo_patch(id: str, caption: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try: try:
@@ -184,18 +209,22 @@ async def photo_patch(id: str, caption: str, current_user: User = Security(get_c
if image is None: if image is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") raise PhotoNotFoundError(id)
col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} ) col_photos.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"caption": caption, "dates.modified": datetime.now(tz=timezone.utc)}} )
return UJSONResponse( return UJSONResponse(
{ {
"id": image["_id"].__str__(), "id": image["_id"].__str__(),
"caption": caption "caption": caption,
"filename": image["filename"]
} }
) )
@app.delete("/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT) photo_delete_responses = {
404: PhotoNotFoundError("id").openapi
}
@app.delete("/photos/{id}", description="Delete a photo by id", status_code=HTTP_204_NO_CONTENT, responses=photo_delete_responses)
async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])): async def photo_delete(id: str, current_user: User = Security(get_current_active_user, scopes=["photos.write"])):
try: try:
@@ -203,7 +232,7 @@ async def photo_delete(id: str, current_user: User = Security(get_current_active
if image is None: if image is None:
raise InvalidId(id) raise InvalidId(id)
except InvalidId: except InvalidId:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail="Could not find an image with such id.") raise PhotoNotFoundError(id)
album = col_albums.find_one( {"name": image["album"]} ) album = col_albums.find_one( {"name": image["album"]} )
@@ -214,14 +243,18 @@ async def photo_delete(id: str, current_user: User = Security(get_current_active
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto) photo_find_reponses = {
400: SearchPageInvalidError().openapi,
404: AlbumNameNotFoundError("name").openapi
}
@app.get("/albums/{album}/photos", description="Find a photo by filename", response_class=UJSONResponse, response_model=SearchResultsPhoto, responses=photo_find_reponses)
async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])): async def photo_find(album: str, q: Union[str, None] = None, caption: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, current_user: User = Security(get_current_active_user, scopes=["photos.list"])):
if col_albums.find_one( {"user": current_user.user, "name": album} ) is None: if col_albums.find_one( {"user": current_user.user, "name": album} ) is None:
raise HTTPException(status_code=HTTP_404_NOT_FOUND, detail=f"Provided album '{album}' does not exist.") raise AlbumNameNotFoundError(album)
if page <= 0 or page_size <= 0: if page <= 0 or page_size <= 0:
raise HTTPException(status_code=HTTP_400_BAD_REQUEST, detail="Parameters 'page' and 'page_size' must be greater or equal to 1.") raise SearchPageInvalidError()
output = {"results": []} output = {"results": []}
skip = (page-1)*page_size skip = (page-1)*page_size
@@ -263,6 +296,6 @@ async def photo_find_token(token: str):
found_record = col_tokens.find_one( {"token": token} ) found_record = col_tokens.find_one( {"token": token} )
if found_record is None: if found_record is None:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail="Invalid search token.") raise SearchTokenInvalidError()
return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"])) return await photo_find(q=found_record["query"], album=found_record["album"], page=found_record["page"], page_size=found_record["page_size"], current_user=pickle.loads(found_record["user"]))

View File

@@ -1,8 +1,8 @@
from datetime import timedelta from datetime import timedelta
from classes.exceptions import UserCredentialsInvalid
from modules.app import app from modules.app import app
from modules.utils import configGet
from fastapi import Depends, HTTPException from fastapi import Depends
from fastapi.security import ( from fastapi.security import (
OAuth2PasswordRequestForm, OAuth2PasswordRequestForm,
) )
@@ -14,12 +14,14 @@ from modules.security import (
create_access_token create_access_token
) )
token_post_responses = {
@app.post("/token", response_model=Token) 401: UserCredentialsInvalid().openapi
}
@app.post("/token", response_model=Token, responses=token_post_responses)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password) user = authenticate_user(form_data.username, form_data.password)
if not user: if not user:
raise HTTPException(status_code=400, detail=configGet("credentials_invalid", "messages")) raise UserCredentialsInvalid()
access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS) access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
access_token = create_access_token( access_token = create_access_token(
data={"sub": user.user, "scopes": form_data.scopes}, data={"sub": user.user, "scopes": form_data.scopes},

View File

@@ -1,4 +1,5 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
from classes.exceptions import UserAlreadyExists, UserCredentialsInvalid, UserEmailCodeInvalid
from modules.database import col_users, col_albums, col_photos, col_emails, col_videos, col_emails from modules.database import col_users, col_albums, col_photos, col_emails, col_videos, col_emails
from modules.app import app from modules.app import app
from modules.utils import configGet, logWrite from modules.utils import configGet, logWrite
@@ -6,9 +7,9 @@ from modules.scheduler import scheduler
from modules.mailer import mail_sender from modules.mailer import mail_sender
from uuid import uuid1 from uuid import uuid1
from fastapi import Depends, HTTPException, Form from fastapi import Depends, Form
from fastapi.responses import Response, UJSONResponse from fastapi.responses import Response, UJSONResponse
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_406_NOT_ACCEPTABLE from starlette.status import HTTP_204_NO_CONTENT
from modules.security import ( from modules.security import (
User, User,
@@ -38,35 +39,43 @@ async def user_me(current_user: User = Depends(get_current_active_user)):
return current_user return current_user
user_confirm_responses = {
400: UserEmailCodeInvalid().openapi
}
if configGet("registration_requires_confirmation") is True: if configGet("registration_requires_confirmation") is True:
@app.get("/users/{user}/confirm") @app.get("/users/{user}/confirm", responses=user_confirm_responses)
@app.patch("/users/{user}/confirm") @app.patch("/users/{user}/confirm", responses=user_confirm_responses)
async def user_confirm(user: str, code: str): async def user_confirm(user: str, code: str):
confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} ) confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} )
if confirm_record is None: if confirm_record is None:
raise HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("email_code_invalid", "messages")) raise UserEmailCodeInvalid()
col_emails.find_one_and_update( {"_id": confirm_record["_id"]}, {"$set": {"used": True}} ) col_emails.find_one_and_update( {"_id": confirm_record["_id"]}, {"$set": {"used": True}} )
col_users.find_one_and_update( {"user": confirm_record["user"]}, {"$set": {"disabled": False}} ) col_users.find_one_and_update( {"user": confirm_record["user"]}, {"$set": {"disabled": False}} )
return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} ) return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} )
user_create_responses = {
409: UserAlreadyExists().openapi
}
if configGet("registration_enabled") is True: if configGet("registration_enabled") is True:
@app.post("/users", status_code=HTTP_204_NO_CONTENT) @app.post("/users", status_code=HTTP_204_NO_CONTENT, responses=user_create_responses)
async def user_create(user: str = Form(), email: str = Form(), password: str = Form()): async def user_create(user: str = Form(), email: str = Form(), password: str = Form()):
if col_users.find_one( {"user": user} ) is not None: if col_users.find_one( {"user": user} ) is not None:
raise HTTPException(HTTP_406_NOT_ACCEPTABLE, detail=configGet("user_already_exists", "messages")) raise UserAlreadyExists()
col_users.insert_one( {"user": user, "email": email, "hash": get_password_hash(password), "disabled": configGet("registration_requires_confirmation")} ) col_users.insert_one( {"user": user, "email": email, "hash": get_password_hash(password), "disabled": configGet("registration_requires_confirmation")} )
if configGet("registration_requires_confirmation") is True: if configGet("registration_requires_confirmation") is True:
scheduler.add_job( send_confirmation, trigger="date", run_date=datetime.now()+timedelta(seconds=1), kwargs={"user": user, "email": email} ) scheduler.add_job( send_confirmation, trigger="date", run_date=datetime.now()+timedelta(seconds=1), kwargs={"user": user, "email": email} )
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)
user_delete_responses = {
@app.delete("/users/me/", status_code=HTTP_204_NO_CONTENT) 401: UserCredentialsInvalid().openapi
}
@app.delete("/users/me/", status_code=HTTP_204_NO_CONTENT, responses=user_delete_responses)
async def user_delete(password: str = Form(), current_user: User = Depends(get_current_active_user)): async def user_delete(password: str = Form(), current_user: User = Depends(get_current_active_user)):
user = get_user(current_user.user) user = get_user(current_user.user)
if not user: if not user:
return False return False
if not verify_password(password, user.hash): if not verify_password(password, user.hash):
raise HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("credentials_invalid", "messages")) raise UserCredentialsInvalid()
col_users.delete_many( {"user": current_user.user} ) col_users.delete_many( {"user": current_user.user} )
col_emails.delete_many( {"user": current_user.user} ) col_emails.delete_many( {"user": current_user.user} )
col_photos.delete_many( {"user": current_user.user} ) col_photos.delete_many( {"user": current_user.user} )

View File

@@ -1,67 +1,9 @@
from os import sep from fastapi import FastAPI
from fastapi import FastAPI, Security, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN
from fastapi.security import APIKeyQuery, APIKeyHeader, APIKeyCookie
from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html
from starlette.status import HTTP_401_UNAUTHORIZED
from fastapi.openapi.models import APIKey
from modules.utils import configGet, jsonLoad
app = FastAPI(title="END PLAY Photos", docs_url=None, redoc_url=None, version="0.1") app = FastAPI(title="END PLAY Photos", docs_url=None, redoc_url=None, version="0.1")
api_key_query = APIKeyQuery(name="apikey", auto_error=False)
api_key_header = APIKeyHeader(name="apikey", auto_error=False)
api_key_cookie = APIKeyCookie(name="apikey", auto_error=False)
def get_all_api_keys():
return jsonLoad(f'{configGet("data_location")}{sep}api_keys.json')
def get_all_expired_keys():
return jsonLoad(f'{configGet("data_location")}{sep}expired_keys.json')
def check_project_key(project: str, apikey: APIKey) -> bool:
keys = jsonLoad(f'{configGet("data_location")}{sep}api_keys.json')
if apikey in keys:
if keys[apikey] != []:
if project in keys[apikey]:
return True
else:
return False
else:
return False
else:
return False
async def get_api_key(
api_key_query: str = Security(api_key_query),
api_key_header: str = Security(api_key_header),
api_key_cookie: str = Security(api_key_cookie),
):
keys = get_all_api_keys()
expired = get_all_expired_keys()
def is_valid(key):
if (key in keys) or (key == "publickey"):
return True
else:
return False
if is_valid(api_key_query):
return api_key_query
elif is_valid(api_key_header):
return api_key_header
elif is_valid(api_key_cookie):
return api_key_cookie
else:
if (api_key_query in expired) or (api_key_header in expired) or (api_key_cookie in expired):
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail=configGet("key_expired", "messages"))
else:
raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages"))
@app.get("/docs", include_in_schema=False) @app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html(): async def custom_swagger_ui_html():
return get_swagger_ui_html( return get_swagger_ui_html(