Random media requests #18

Merged
profitroll merged 5 commits from dev into master 2023-06-27 14:54:29 +03:00
9 changed files with 43 additions and 43 deletions
Showing only changes of commit b3c9a972c8 - Show all commits

View File

@ -108,12 +108,12 @@ async def album_patch(
except InvalidId:
raise AlbumNotFoundError(id)
if title is not None:
if 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
else:
if title is None:
title = album["title"]
elif 2 > len(title) > 40:
raise AlbumIncorrectError("title", "must be >2 and <40 characters.")
if name is not None:
if re.search(re.compile("^[a-z,0-9,_]*$"), name) is False:
raise AlbumIncorrectError(

View File

@ -146,11 +146,9 @@ async def photo_upload(
)
duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False:
if len(duplicates) > 0 and not ignore_duplicates:
if configGet("media_token_access") is True:
duplicates_ids = []
for entry in duplicates:
duplicates_ids.append(entry["id"])
duplicates_ids = [entry["id"] for entry in duplicates]
access_token = create_access_token(
data={
"sub": current_user.user,
@ -200,7 +198,7 @@ async def photo_upload(
}
)
if compress is True:
if compress:
scheduler.add_job(
compress_image,
trigger="date",
@ -591,7 +589,7 @@ async def photo_find(
}
elif q is None and caption is None:
raise PhotoSearchQueryEmptyError()
elif q is None and caption is not None:
elif q is None:
db_query = {
"user": current_user.user,
"album": album,
@ -602,7 +600,7 @@ async def photo_find(
"album": album,
"caption": re.compile(caption),
}
elif q is not None and caption is None:
elif caption is None:
db_query = {
"user": current_user.user,
"album": album,

View File

@ -385,7 +385,7 @@ async def video_find(
if q is None and caption is None:
raise VideoSearchQueryEmptyError()
if q is None and caption is not None:
if q is None:
db_query = {
"user": current_user.user,
"album": album,
@ -396,7 +396,7 @@ async def video_find(
"album": album,
"caption": re.compile(caption),
}
elif q is not None and caption is None:
elif caption is None:
db_query = list(
col_videos.find(
{"user": current_user.user, "album": album, "filename": re.compile(q)},

View File

@ -7,8 +7,8 @@ app = FastAPI(title="END PLAY Photos", docs_url=None, redoc_url=None, version="0
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
return get_swagger_ui_html(
openapi_url=app.openapi_url, # type: ignore
title=app.title + " - Documentation",
openapi_url=app.openapi_url,
title=f"{app.title} - Documentation",
swagger_favicon_url="/favicon.ico",
)
@ -16,7 +16,7 @@ async def custom_swagger_ui_html():
@app.get("/redoc", include_in_schema=False)
async def custom_redoc_html():
return get_redoc_html(
openapi_url=app.openapi_url, # type: ignore
title=app.title + " - Documentation",
openapi_url=app.openapi_url,
title=f"{app.title} - Documentation",
redoc_favicon_url="/favicon.ico",
)

View File

@ -24,7 +24,7 @@ db = db_client.get_database(name=db_config["name"])
collections = db.list_collection_names()
for collection in ["users", "albums", "photos", "videos", "tokens", "emails"]:
if not collection in collections:
if collection not in collections:
db.create_collection(collection)
col_users = db.get_collection("users")

View File

@ -1,3 +1,5 @@
import contextlib
from exif import Image
@ -12,8 +14,10 @@ def decimal_coords(coords: float, ref: str) -> float:
* float: Decimal degrees
"""
decimal_degrees = coords[0] + coords[1] / 60 + coords[2] / 3600
if ref == "S" or ref == "W":
if ref in {"S", "W"}:
decimal_degrees = -decimal_degrees
return round(decimal_degrees, 5)
@ -35,11 +39,9 @@ def extract_location(filepath: str) -> dict:
if img.has_exif is False:
return output
try:
with contextlib.suppress(AttributeError):
output["lng"] = decimal_coords(img.gps_longitude, img.gps_longitude_ref)
output["lat"] = decimal_coords(img.gps_latitude, img.gps_latitude_ref)
output["alt"] = img.gps_altitude
except AttributeError:
pass
return output

View File

@ -11,9 +11,9 @@ def get_py_files(src):
cwd = getcwd() # Current Working directory
py_files = []
for root, dirs, files in walk(src):
for file in files:
if file.endswith(".py"):
py_files.append(Path(f"{cwd}/{root}/{file}"))
py_files.extend(
Path(f"{cwd}/{root}/{file}") for file in files if file.endswith(".py")
)
return py_files

View File

@ -1,3 +1,6 @@
from pathlib import Path
from typing import Union
import cv2
import numpy as np
from numpy.typing import NDArray
@ -17,18 +20,18 @@ def hash_hex_to_hash_array(hash_hex) -> NDArray:
# convert hash string in hex to hash values of 0 or 1
hash_str = int(hash_hex, 16)
array_str = bin(hash_str)[2:]
return np.array([i for i in array_str], dtype=np.float32)
return np.array(list(array_str), dtype=np.float32)
def get_duplicates_cache(album: str) -> dict:
output = {}
for photo in col_photos.find({"album": album}):
output[photo["filename"]] = [photo["_id"].__str__(), photo["hash"]]
return output
return {
photo["filename"]: [photo["_id"].__str__(), photo["hash"]]
for photo in col_photos.find({"album": album})
}
async def get_phash(filepath: str) -> str:
img = cv2.imread(filepath)
async def get_phash(filepath: Union[str, Path]) -> str:
img = cv2.imread(str(filepath))
# resize image and convert to gray scale
img = cv2.resize(img, (64, 64))
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
@ -49,14 +52,14 @@ async def get_phash(filepath: str) -> str:
return hash_array_to_hash_hex(dct_block.flatten())
async def get_duplicates(hash: str, album: str) -> list:
async def get_duplicates(hash_string: str, album: str) -> list:
duplicates = []
cache = get_duplicates_cache(album)
for image_name in cache.keys():
for image_name, image_object in cache.items():
try:
distance = spatial.distance.hamming(
hash_hex_to_hash_array(cache[image_name][1]),
hash_hex_to_hash_array(hash),
hash_hex_to_hash_array(hash_string),
)
except ValueError:
continue

View File

@ -73,12 +73,10 @@ def get_user(user: str):
def authenticate_user(user_name: str, password: str):
user = get_user(user_name)
if not user:
if user := get_user(user_name):
return user if verify_password(password, user.hash) else False
else:
return False
if not verify_password(password, user.hash):
return False
return user
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
@ -89,9 +87,8 @@ def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None
expire = datetime.now(tz=timezone.utc) + timedelta(
days=ACCESS_TOKEN_EXPIRE_DAYS
)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
to_encode["exp"] = expire
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(