2025-02-09 23:37:44 +01:00
|
|
|
from copy import deepcopy
|
2025-02-09 23:00:18 +01:00
|
|
|
from typing import Dict, Any, Literal
|
|
|
|
|
2025-02-09 23:37:44 +01:00
|
|
|
from bson import ObjectId
|
|
|
|
from ujson import dumps, loads
|
|
|
|
|
2025-02-09 23:00:18 +01:00
|
|
|
from classes.cache.holo_cache_memcached import HoloCacheMemcached
|
|
|
|
from classes.cache.holo_cache_redis import HoloCacheRedis
|
|
|
|
|
|
|
|
|
|
|
|
def create_cache_client(
|
|
|
|
config: Dict[str, Any],
|
|
|
|
engine: Literal["memcached", "redis"] | None = None,
|
|
|
|
) -> HoloCacheMemcached | HoloCacheRedis:
|
|
|
|
if engine not in ["memcached", "redis"] or engine is None:
|
|
|
|
raise KeyError(
|
|
|
|
f"Incorrect cache engine provided. Expected 'memcached' or 'redis', got '{engine}'"
|
|
|
|
)
|
|
|
|
|
|
|
|
if "cache" not in config or engine not in config["cache"]:
|
|
|
|
raise KeyError(
|
|
|
|
f"Cache configuration is invalid. Please check if all keys are set (engine: '{engine}')"
|
|
|
|
)
|
|
|
|
|
|
|
|
match engine:
|
|
|
|
case "memcached":
|
|
|
|
return HoloCacheMemcached.from_config(config["cache"][engine])
|
|
|
|
case "redis":
|
|
|
|
return HoloCacheRedis.from_config(config["cache"][engine])
|
|
|
|
case _:
|
|
|
|
raise KeyError(
|
|
|
|
f"Cache implementation for the engine '{engine}' is not present."
|
|
|
|
)
|
2025-02-09 23:37:44 +01:00
|
|
|
|
|
|
|
|
|
|
|
def json_to_string(json_object: Any) -> str:
|
|
|
|
json_object_copy: Any = deepcopy(json_object)
|
|
|
|
|
|
|
|
if isinstance(json_object_copy, dict) and "_id" in json_object_copy:
|
|
|
|
json_object_copy["_id"] = str(json_object_copy["_id"])
|
|
|
|
|
|
|
|
return dumps(
|
|
|
|
json_object_copy, ensure_ascii=False, indent=0, escape_forward_slashes=False
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def string_to_json(json_string: str) -> Any:
|
|
|
|
json_object: Any = loads(json_string)
|
|
|
|
|
|
|
|
if "_id" in json_object:
|
|
|
|
json_object["_id"] = ObjectId(json_object["_id"])
|
|
|
|
|
|
|
|
return json_object
|