Discord/classes/holo_user.py

228 lines
6.9 KiB
Python

import logging
from logging import Logger
from typing import Any, Dict, Optional
from bson import ObjectId
from discord import User, Member
from libbot.cache.classes import Cache
from libbot.utils import config_get
from pymongo.results import InsertOneResult
from errors import UserNotFoundError
from modules.database import col_users
logger: Logger = logging.getLogger(__name__)
class HoloUser:
def __init__(
self,
_id: ObjectId,
id: int,
custom_role: int | None,
custom_channel: int | None,
) -> None:
self._id: ObjectId = _id
self.id: int = id
self.custom_role: int | None = custom_role
self.custom_channel: int | None = custom_channel
@classmethod
async def from_user(
cls,
user: User | Member,
allow_creation: bool = True,
cache: Optional[Cache] = None,
) -> "HoloUser":
"""Get an object that has a proper binding between Discord ID and database
### Args:
* `user` (User | Member): Object from which an ID can be extracted
* `allow_creation` (bool, optional): Whether to allow creation of a new user record if none found. Defaults to True.
* `cache` (Cache, optional): Cache engine to get the cache from
### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database
"""
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user.id}")
if cached_entry is not None:
return cls(**cached_entry)
db_entry: Dict[str, Any] | None = await col_users.find_one({"id": user.id})
if db_entry is None:
if not allow_creation:
raise UserNotFoundError(user=user, user_id=user.id)
db_entry = HoloUser.get_defaults(user.id)
insert_result: InsertOneResult = await col_users.insert_one(db_entry)
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(f"user_{user.id}", db_entry)
return cls(**db_entry)
@classmethod
async def from_id(cls, user_id: int) -> "HoloUser":
raise NotImplementedError()
async def _set(self, key: str, value: Any, cache: Optional[Cache] = None) -> None:
"""Set attribute data and save it into the database
### Args:
* `key` (str): Attribute to be changed
* `value` (Any): Value to set
* `cache` (Cache, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: value}}, upsert=True
)
self._update_cache(cache)
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value)
async def _remove(self, key: str, cache: Optional[Cache] = None) -> None:
"""Remove attribute data and save it into the database
### Args:
* `key` (str): Attribute to be removed
* `cache` (Cache, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = HoloUser.get_default_value(key)
setattr(self, key, default_value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: default_value}}, upsert=True
)
self._update_cache(cache)
logger.info("Removed attribute '%s' of user %s", key, self.id)
def _get_cache_key(self) -> str:
return f"user_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self._to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
@staticmethod
def get_defaults(user_id: int | None = None) -> Dict[str, Any]:
return {
"id": user_id,
"custom_role": None,
"custom_channel": None,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in HoloUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in HoloUser")
return HoloUser.get_defaults()[key]
def _to_dict(self) -> Dict[str, Any]:
return {
"_id": self._id,
"id": self.id,
"custom_role": self.custom_role,
"custom_channel": self.custom_channel,
}
async def set_custom_channel(
self, channel_id: int, cache: Optional[Cache] = None
) -> None:
await self._set("custom_channel", channel_id, cache=cache)
async def set_custom_role(
self, role_id: int, cache: Optional[Cache] = None
) -> None:
await self._set("custom_role", role_id, cache=cache)
async def remove_custom_channel(self, cache: Optional[Cache] = None) -> None:
await self._remove("custom_channel", cache=cache)
async def remove_custom_role(self, cache: Optional[Cache] = None) -> None:
await self._remove("custom_role", cache=cache)
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove user data from database. Only removes the user record from users collection.
### Args:
* `cache` (Cache, optional): Cache engine to write the update into
"""
await col_users.delete_one({"_id": self._id})
self._delete_cache(cache)
@staticmethod
async def is_moderator(member: User | Member) -> bool:
"""Check if user is moderator or council member
### Args:
* `member` (User | Member): Member object
### Returns:
`bool`: `True` if member is a moderator or member of council and `False` if not
"""
if isinstance(member, User):
return False
moderator_role: int | None = await config_get("moderators", "roles")
council_role: int | None = await config_get("council", "roles")
for role in member.roles:
if role.id in (moderator_role, council_role):
return True
return False
@staticmethod
async def is_council(member: User | Member) -> bool:
"""Check if user is a member of council
### Args:
* `member` (User | Member): Member object
### Returns:
`bool`: `True` if member is a member of council and `False` if not
"""
if isinstance(member, User):
return False
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == council_role:
return True
return False