Implemented /event add

This commit is contained in:
2025-04-18 23:34:11 +02:00
parent fd44276021
commit ec733097ea
6 changed files with 298 additions and 24 deletions

View File

@@ -1,17 +1,216 @@
from dataclasses import dataclass
from datetime import datetime
from typing import List
from datetime import datetime, timezone
from logging import Logger
from typing import Any, Dict, List, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from modules.database import col_events
from modules.logging_utils import get_logger
logger: Logger = get_logger(__name__)
@dataclass
class PycordEvent:
__slots__ = (
"_id",
"name",
"guild_id",
"created",
"creator_id",
"starts",
"ends",
"thumbnail_id",
"stage_ids",
)
__short_name__ = "event"
__collection__ = col_events
_id: ObjectId
id: int
name: str
guild_id: int
created: datetime
creator_id: int
starts: datetime
ends: datetime
thumbnail_id: str | None
stage_ids: List[int]
@classmethod
async def from_id(cls, event_id: str | ObjectId, cache: Optional[Cache] = None) -> "PycordEvent":
"""Find event in the database.
Args:
event_id (str | ObjectId): Event's ID
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
PycordEvent: Event object
Raises:
EventNotFoundError: Event was not found
"""
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"{cls.__short_name__}_{event_id}")
if cached_entry is not None:
return cls(**cached_entry)
db_entry = await cls.__collection__.find_one(
{"_id": event_id if isinstance(event_id, ObjectId) else ObjectId(event_id)}
)
if db_entry is None:
raise RuntimeError(f"Event {event_id} not found")
# TODO Add a unique exception
# raise EventNotFoundError(event_id)
if cache is not None:
cache.set_json(f"{cls.__short_name__}_{event_id}", db_entry)
return cls(**db_entry)
# TODO Implement this method
@classmethod
async def create(
cls,
name: str,
guild_id: int,
creator_id: int,
starts: datetime,
ends: datetime,
thumbnail_id: str | None,
cache: Optional[Cache] = None,
) -> "PycordEvent":
db_entry: Dict[str, Any] = {
"name": name,
"guild_id": guild_id,
"created": datetime.now(tz=timezone.utc),
"creator_id": creator_id,
"starts": starts,
"ends": ends,
"thumbnail_id": thumbnail_id,
"stage_ids": [],
}
insert_result: InsertOneResult = await cls.__collection__.insert_one(db_entry)
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(f"{cls.__short_name__}_{guild_id}", db_entry)
return cls(**db_entry)
async def _set(self, key: str, value: Any, cache: Optional[Cache] = None) -> None:
"""Set attribute data and save it into the database.
Args:
key (str): Attribute to change
value (Any): Value to set
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await self.__collection__.update_one({"_id": self._id}, {"$set": {key: value}}, upsert=True)
self._update_cache(cache)
logger.info("Set attribute '%s' of event %s to '%s'", key, self._id, value)
async def _remove(self, key: str, cache: Optional[Cache] = None) -> None:
"""Remove attribute data and save it into the database.
Args:
key (str): Attribute to remove
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = PycordEvent.get_default_value(key)
setattr(self, key, default_value)
await self.__collection__.update_one({"_id": self._id}, {"$set": {key: default_value}}, upsert=True)
self._update_cache(cache)
logger.info("Removed attribute '%s' of event %s", key, self._id)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self._id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self.to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordEvent object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordEvent
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"name": self.name,
"guild_id": self.guild_id,
"created": self.created,
"creator_id": self.creator_id,
"starts": self.starts,
"ends": self.ends,
"thumbnail_id": self.thumbnail_id,
"stage_ids": self.stage_ids,
}
@staticmethod
def get_defaults() -> Dict[str, Any]:
return {
"name": None,
"guild_id": None,
"created": None,
"creator_id": None,
"starts": None,
"ends": None,
"thumbnail_id": None,
"stage_ids": [],
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in PycordEvent.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in PycordEvent")
return PycordEvent.get_defaults()[key]
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove event data from database. Currently only removes the event record from events collection.
Args:
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
await self.__collection__.delete_one({"_id": self._id})
self._delete_cache(cache)