from dataclasses import dataclass from datetime import datetime from logging import Logger from typing import Any, Dict, List, Optional from zoneinfo import ZoneInfo from bson import ObjectId from libbot.cache.classes import Cache from pymongo.results import InsertOneResult from modules.database import col_events, col_stages from modules.logging_utils import get_logger logger: Logger = get_logger(__name__) @dataclass class PycordEvent: __slots__ = ( "_id", "name", "guild_id", "created", "ended", "cancelled", "creator_id", "starts", "ends", "thumbnail_id", "stage_ids", ) __short_name__ = "event" __collection__ = col_events _id: ObjectId name: str guild_id: int created: datetime ended: datetime | None cancelled: bool creator_id: int starts: datetime ends: datetime thumbnail_id: str | None stage_ids: List[ObjectId] @classmethod async def from_id(cls, event_id: str | ObjectId, cache: Optional[Cache] = None) -> "PycordEvent": """Find event in the database. Args: event_id (str | ObjectId): Event's ID cache (:obj:`Cache`, optional): Cache engine to get the cache from Returns: PycordEvent: Event object Raises: EventNotFoundError: Event was not found """ if cache is not None: cached_entry: Dict[str, Any] | None = cache.get_json(f"{cls.__short_name__}_{event_id}") if cached_entry is not None: return cls(**cached_entry) db_entry = await cls.__collection__.find_one( {"_id": event_id if isinstance(event_id, ObjectId) else ObjectId(event_id)} ) if db_entry is None: raise RuntimeError(f"Event {event_id} not found") # TODO Add a unique exception # raise EventNotFoundError(event_id) if cache is not None: cache.set_json(f"{cls.__short_name__}_{event_id}", db_entry) return cls(**db_entry) @classmethod async def from_name(cls, event_name: str, cache: Optional[Cache] = None) -> "PycordEvent": # TODO Add sorting by creation date or something. # Duplicate events should be avoided, latest active event should be returned. db_entry: Dict[str, Any] | None = await cls.__collection__.find_one({"name": event_name}) if db_entry is None: raise RuntimeError(f"Event with name {event_name} not found") # TODO Add a unique exception # raise EventNotFoundError(event_name) if cache is not None: cache.set_json(f"{cls.__short_name__}_{db_entry['_id']}", db_entry) return cls(**db_entry) @classmethod async def create( cls, name: str, guild_id: int, creator_id: int, starts: datetime, ends: datetime, thumbnail_id: str | None, cache: Optional[Cache] = None, ) -> "PycordEvent": db_entry: Dict[str, Any] = { "name": name, "guild_id": guild_id, "created": datetime.now(tz=ZoneInfo("UTC")), "ended": None, "cancelled": False, "creator_id": creator_id, "starts": starts, "ends": ends, "thumbnail_id": thumbnail_id, "stage_ids": [], } insert_result: InsertOneResult = await cls.__collection__.insert_one(db_entry) db_entry["_id"] = insert_result.inserted_id if cache is not None: cache.set_json(f"{cls.__short_name__}_{guild_id}", db_entry) return cls(**db_entry) # TODO Update the docstring async def _set(self, cache: Optional[Cache] = None, **kwargs) -> None: """Set attribute data and save it into the database. Args: key (str): Attribute to change value (Any): Value to set cache (:obj:`Cache`, optional): Cache engine to write the update into """ for key, value in kwargs.items(): if not hasattr(self, key): raise AttributeError() setattr(self, key, value) await self.__collection__.update_one({"_id": self._id}, {"$set": kwargs}, upsert=True) self._update_cache(cache) logger.info("Set attributes of event %s to %s", self._id, kwargs) # TODO Update the docstring async def _remove(self, cache: Optional[Cache] = None, *args: str) -> None: """Remove attribute data and save it into the database. Args: key (str): Attribute to remove cache (:obj:`Cache`, optional): Cache engine to write the update into """ attributes: Dict[str, Any] = {} for key in args: if not hasattr(self, key): raise AttributeError() default_value: Any = self.get_default_value(key) setattr(self, key, default_value) attributes[key] = default_value await self.__collection__.update_one({"_id": self._id}, {"$set": attributes}, upsert=True) self._update_cache(cache) logger.info("Reset attributes %s of event %s to default values", args, self._id) def _get_cache_key(self) -> str: return f"{self.__short_name__}_{self._id}" def _update_cache(self, cache: Optional[Cache] = None) -> None: if cache is None: return user_dict: Dict[str, Any] = self.to_dict() if user_dict is not None: cache.set_json(self._get_cache_key(), user_dict) else: self._delete_cache(cache) def _delete_cache(self, cache: Optional[Cache] = None) -> None: if cache is None: return cache.delete(self._get_cache_key()) def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]: """Convert PycordEvent object to a JSON representation. Args: json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted Returns: Dict[str, Any]: JSON representation of PycordEvent """ return { "_id": self._id if not json_compatible else str(self._id), "name": self.name, "guild_id": self.guild_id, "created": self.created, "ended": self.ended, "cancelled": self.cancelled, "creator_id": self.creator_id, "starts": self.starts, "ends": self.ends, "thumbnail_id": self.thumbnail_id, "stage_ids": self.stage_ids, } @staticmethod def get_defaults() -> Dict[str, Any]: return { "name": None, "guild_id": None, "created": None, "ended": None, "cancelled": False, "creator_id": None, "starts": None, "ends": None, "thumbnail_id": None, "stage_ids": [], } @staticmethod def get_default_value(key: str) -> Any: if key not in PycordEvent.get_defaults(): raise KeyError(f"There's no default value for key '{key}' in PycordEvent") return PycordEvent.get_defaults()[key] # TODO Add documentation async def update( self, cache: Optional[Cache] = None, **kwargs, ): await self._set(cache=cache, **kwargs) # TODO Add documentation async def reset( self, cache: Optional[Cache] = None, *args, ): await self._remove(cache, *args) async def purge(self, cache: Optional[Cache] = None) -> None: """Completely remove event data from database. Currently only removes the event record from events collection. Args: cache (:obj:`Cache`, optional): Cache engine to write the update into """ await self.__collection__.delete_one({"_id": self._id}) self._delete_cache(cache) async def cancel(self, cache: Optional[Cache] = None): await self._set(cache, cancelled=True) async def insert_stage( self, event_stage_id: ObjectId, sequence: int, cache: Optional[Cache] = None ) -> None: self.stage_ids.insert(sequence, event_stage_id) await self._set(cache, stage_ids=self.stage_ids) # TODO Check if this works await col_stages.update_many({"_id": {"$eq": self.stage_ids[sequence:]}}, {"$inc": {"sequence": 1}})