Refactor changed are done

This commit is contained in:
Profitroll 2023-06-23 12:17:01 +02:00
parent b003712358
commit 2a7870620c
Signed by: profitroll
GPG Key ID: FA35CAB49DACD3B2
9 changed files with 36 additions and 39 deletions

View File

@ -108,12 +108,12 @@ async def album_patch(
except InvalidId: except InvalidId:
raise AlbumNotFoundError(id) raise AlbumNotFoundError(id)
if title is not None: if title is None:
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
else:
title = album["title"] title = album["title"]
elif 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if name is not None: if name is not None:
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False: if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError( raise AlbumIncorrectError(

View File

@ -139,11 +139,9 @@ async def photo_upload(
) )
duplicates = await get_duplicates(file_hash, album) duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False: if len(duplicates) > 0 and not ignore_duplicates:
if configGet("media_token_access") is True: if configGet("media_token_access") is True:
duplicates_ids = [] duplicates_ids = [entry["id"] for entry in duplicates]
for entry in duplicates:
duplicates_ids.append(entry["id"])
access_token = create_access_token( access_token = create_access_token(
data={ data={
"sub": current_user.user, "sub": current_user.user,
@ -193,7 +191,7 @@ async def photo_upload(
} }
) )
if compress is True: if compress:
scheduler.add_job( scheduler.add_job(
compress_image, compress_image,
trigger="date", trigger="date",
@ -519,7 +517,7 @@ async def photo_find(
} }
elif q is None and caption is None: elif q is None and caption is None:
raise PhotoSearchQueryEmptyError() raise PhotoSearchQueryEmptyError()
elif q is None and caption is not None: elif q is None:
db_query = { db_query = {
"user": current_user.user, "user": current_user.user,
"album": album, "album": album,
@ -530,7 +528,7 @@ async def photo_find(
"album": album, "album": album,
"caption": re.compile(caption), "caption": re.compile(caption),
} }
elif q is not None and caption is None: elif caption is None:
db_query = { db_query = {
"user": current_user.user, "user": current_user.user,
"album": album, "album": album,

View File

@ -313,7 +313,7 @@ async def video_find(
if q is None and caption is None: if q is None and caption is None:
raise VideoSearchQueryEmptyError() raise VideoSearchQueryEmptyError()
if q is None and caption is not None: if q is None:
db_query = { db_query = {
"user": current_user.user, "user": current_user.user,
"album": album, "album": album,
@ -324,7 +324,7 @@ async def video_find(
"album": album, "album": album,
"caption": re.compile(caption), "caption": re.compile(caption),
} }
elif q is not None and caption is None: elif caption is None:
db_query = list( db_query = list(
col_videos.find( col_videos.find(
{"user": current_user.user, "album": album, "filename": re.compile(q)}, {"user": current_user.user, "album": album, "filename": re.compile(q)},

View File

@ -7,8 +7,8 @@ app = FastAPI(title="END PLAY Photos", docs_url=None, redoc_url=None, version="0
@app.get("/docs", include_in_schema=False) @app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html(): async def custom_swagger_ui_html():
return get_swagger_ui_html( return get_swagger_ui_html(
openapi_url=app.openapi_url, # type: ignore openapi_url=app.openapi_url,
title=app.title + " - Documentation", title=f"{app.title} - Documentation",
swagger_favicon_url="/favicon.ico", swagger_favicon_url="/favicon.ico",
) )
@ -16,7 +16,7 @@ async def custom_swagger_ui_html():
@app.get("/redoc", include_in_schema=False) @app.get("/redoc", include_in_schema=False)
async def custom_redoc_html(): async def custom_redoc_html():
return get_redoc_html( return get_redoc_html(
openapi_url=app.openapi_url, # type: ignore openapi_url=app.openapi_url,
title=app.title + " - Documentation", title=f"{app.title} - Documentation",
redoc_favicon_url="/favicon.ico", redoc_favicon_url="/favicon.ico",
) )

View File

@ -24,7 +24,7 @@ db = db_client.get_database(name=db_config["name"])
collections = db.list_collection_names() collections = db.list_collection_names()
for collection in ["users", "albums", "photos", "videos", "tokens", "emails"]: for collection in ["users", "albums", "photos", "videos", "tokens", "emails"]:
if not collection in collections: if collection not in collections:
db.create_collection(collection) db.create_collection(collection)
col_users = db.get_collection("users") col_users = db.get_collection("users")

View File

@ -1,3 +1,5 @@
import contextlib
from exif import Image from exif import Image
@ -12,8 +14,10 @@ def decimal_coords(coords: float, ref: str) -> float:
* float: Decimal degrees * float: Decimal degrees
""" """
decimal_degrees = coords[0] + coords[1] / 60 + coords[2] / 3600 decimal_degrees = coords[0] + coords[1] / 60 + coords[2] / 3600
if ref == "S" or ref == "W":
if ref in {"S", "W"}:
decimal_degrees = -decimal_degrees decimal_degrees = -decimal_degrees
return round(decimal_degrees, 5) return round(decimal_degrees, 5)
@ -35,11 +39,9 @@ def extract_location(filepath: str) -> dict:
if img.has_exif is False: if img.has_exif is False:
return output return output
try: with contextlib.suppress(AttributeError):
output["lng"] = decimal_coords(img.gps_longitude, img.gps_longitude_ref) output["lng"] = decimal_coords(img.gps_longitude, img.gps_longitude_ref)
output["lat"] = decimal_coords(img.gps_latitude, img.gps_latitude_ref) output["lat"] = decimal_coords(img.gps_latitude, img.gps_latitude_ref)
output["alt"] = img.gps_altitude output["alt"] = img.gps_altitude
except AttributeError:
pass
return output return output

View File

@ -11,9 +11,9 @@ def get_py_files(src):
cwd = getcwd() # Current Working directory cwd = getcwd() # Current Working directory
py_files = [] py_files = []
for root, dirs, files in walk(src): for root, dirs, files in walk(src):
for file in files: py_files.extend(
if file.endswith(".py"): Path(f"{cwd}/{root}/{file}") for file in files if file.endswith(".py")
py_files.append(Path(f"{cwd}/{root}/{file}")) )
return py_files return py_files

View File

@ -20,14 +20,14 @@ def hash_hex_to_hash_array(hash_hex) -> NDArray:
# convert hash string in hex to hash values of 0 or 1 # convert hash string in hex to hash values of 0 or 1
hash_str = int(hash_hex, 16) hash_str = int(hash_hex, 16)
array_str = bin(hash_str)[2:] array_str = bin(hash_str)[2:]
return np.array([i for i in array_str], dtype=np.float32) return np.array(list(array_str), dtype=np.float32)
def get_duplicates_cache(album: str) -> dict: def get_duplicates_cache(album: str) -> dict:
output = {} return {
for photo in col_photos.find({"album": album}): photo["filename"]: [photo["_id"].__str__(), photo["hash"]]
output[photo["filename"]] = [photo["_id"].__str__(), photo["hash"]] for photo in col_photos.find({"album": album})
return output }
async def get_phash(filepath: Union[str, Path]) -> str: async def get_phash(filepath: Union[str, Path]) -> str:
@ -55,7 +55,7 @@ async def get_phash(filepath: Union[str, Path]) -> str:
async def get_duplicates(hash_string: str, album: str) -> list: async def get_duplicates(hash_string: str, album: str) -> list:
duplicates = [] duplicates = []
cache = get_duplicates_cache(album) cache = get_duplicates_cache(album)
for image_name, image_value in cache.items(): for image_name, image_object in cache.items():
try: try:
distance = spatial.distance.hamming( distance = spatial.distance.hamming(
hash_hex_to_hash_array(cache[image_name][1]), hash_hex_to_hash_array(cache[image_name][1]),

View File

@ -73,12 +73,10 @@ def get_user(user: str):
def authenticate_user(user_name: str, password: str): def authenticate_user(user_name: str, password: str):
user = get_user(user_name) if user := get_user(user_name):
if not user: return user if verify_password(password, user.hash) else False
else:
return False return False
if not verify_password(password, user.hash):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
@ -89,9 +87,8 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
expire = datetime.now(tz=timezone.utc) + timedelta( expire = datetime.now(tz=timezone.utc) + timedelta(
days=ACCESS_TOKEN_EXPIRE_DAYS days=ACCESS_TOKEN_EXPIRE_DAYS
) )
to_encode.update({"exp": expire}) to_encode["exp"] = expire
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_user( async def get_current_user(