PhotosAPI/modules/hasher.py

76 lines
2.5 KiB
Python
Raw Permalink Normal View History

2023-06-23 13:09:36 +03:00
from pathlib import Path
2023-08-14 14:44:07 +03:00
from typing import Any, List, Mapping, Union
2023-06-23 13:09:36 +03:00
2023-06-22 14:17:53 +03:00
import cv2
2022-12-20 02:22:32 +02:00
import numpy as np
from numpy.typing import NDArray
from scipy import spatial
2023-06-22 14:17:53 +03:00
from modules.database import col_photos
2022-12-20 02:22:32 +02:00
2023-03-12 15:59:13 +02:00
2023-08-14 14:44:07 +03:00
def hash_array_to_hash_hex(hash_array) -> str:
2023-03-12 15:59:13 +02:00
# convert hash array of 0 or 1 to hash string in hex
hash_array = np.array(hash_array, dtype=np.uint8)
hash_str = "".join(str(i) for i in 1 * hash_array.flatten())
return hex(int(hash_str, 2))
2022-12-20 02:22:32 +02:00
def hash_hex_to_hash_array(hash_hex) -> NDArray:
2023-03-12 15:59:13 +02:00
# convert hash string in hex to hash values of 0 or 1
hash_str = int(hash_hex, 16)
array_str = bin(hash_str)[2:]
2023-06-23 13:17:01 +03:00
return np.array(list(array_str), dtype=np.float32)
2023-03-12 15:59:13 +02:00
2022-12-20 02:22:32 +02:00
2023-08-14 14:44:07 +03:00
async def get_duplicates_cache(album: str) -> Mapping[str, Any]:
2023-06-23 13:17:01 +03:00
return {
photo["filename"]: [photo["_id"].__str__(), photo["hash"]]
2023-08-14 14:44:07 +03:00
async for photo in col_photos.find({"album": album})
2023-06-23 13:17:01 +03:00
}
2022-12-20 02:22:32 +02:00
2023-03-12 15:59:13 +02:00
2023-06-23 13:09:36 +03:00
async def get_phash(filepath: Union[str, Path]) -> str:
img = cv2.imread(str(filepath))
2022-12-20 02:22:32 +02:00
# resize image and convert to gray scale
img = cv2.resize(img, (64, 64))
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
2023-03-12 15:59:13 +02:00
img = np.array(img, dtype=np.float32)
# calculate dct of image
2022-12-20 02:22:32 +02:00
dct = cv2.dct(img)
2023-03-12 15:59:13 +02:00
# to reduce hash length take only 8*8 top-left block
2022-12-20 02:22:32 +02:00
# as this block has more information than the rest
2023-03-12 15:59:13 +02:00
dct_block = dct[:8, :8]
2022-12-20 02:22:32 +02:00
# caclulate mean of dct block excluding first term i.e, dct(0, 0)
2023-03-12 15:59:13 +02:00
dct_average = (dct_block.mean() * dct_block.size - dct_block[0, 0]) / (
dct_block.size - 1
)
2022-12-20 02:22:32 +02:00
# convert dct block to binary values based on dct_average
dct_block[dct_block < dct_average] = 0.0
dct_block[dct_block != 0] = 1.0
# store hash value
return hash_array_to_hash_hex(dct_block.flatten())
2023-03-12 15:59:13 +02:00
2023-08-14 14:44:07 +03:00
async def get_duplicates(hash_string: str, album: str) -> List[Mapping[str, Any]]:
2022-12-20 02:22:32 +02:00
duplicates = []
2023-08-14 14:44:07 +03:00
cache = await get_duplicates_cache(album)
2023-06-23 13:17:01 +03:00
for image_name, image_object in cache.items():
2023-02-14 15:32:20 +02:00
try:
distance = spatial.distance.hamming(
2023-03-12 15:59:13 +02:00
hash_hex_to_hash_array(cache[image_name][1]),
2023-06-23 13:09:36 +03:00
hash_hex_to_hash_array(hash_string),
2023-02-14 15:32:20 +02:00
)
except ValueError:
continue
2023-02-15 17:08:01 +02:00
# print("{0:<30} {1}".format(image_name, distance), flush=True)
if distance <= 0.1:
2023-03-12 15:59:13 +02:00
duplicates.append(
{
"id": cache[image_name][0],
"filename": image_name,
"difference": distance,
}
)
return duplicates