228 lines
7.0 KiB
Python
228 lines
7.0 KiB
Python
import logging
|
|
from logging import Logger
|
|
from typing import Any, Dict
|
|
|
|
from bson import ObjectId
|
|
from discord import User, Member
|
|
from libbot.utils import config_get
|
|
from pymongo.results import InsertOneResult
|
|
|
|
from classes.cache import HoloCache
|
|
from errors import UserNotFoundError
|
|
from modules.database import col_users
|
|
|
|
logger: Logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HoloUser:
|
|
def __init__(
|
|
self,
|
|
_id: ObjectId,
|
|
id: int,
|
|
custom_role: int | None,
|
|
custom_channel: int | None,
|
|
) -> None:
|
|
self._id: ObjectId = _id
|
|
|
|
self.id: int = id
|
|
self.custom_role: int | None = custom_role
|
|
self.custom_channel: int | None = custom_channel
|
|
|
|
@classmethod
|
|
async def from_user(
|
|
cls,
|
|
user: User | Member,
|
|
allow_creation: bool = True,
|
|
cache: HoloCache | None = None,
|
|
) -> "HoloUser":
|
|
"""Get an object that has a proper binding between Discord ID and database
|
|
|
|
### Args:
|
|
* `user` (User | Member): Object from which an ID can be extracted
|
|
* `allow_creation` (bool, optional): Whether to allow creation of a new user record if none found. Defaults to True.
|
|
* `cache` (HoloCache | None, optional): Cache engine to get the cache from
|
|
|
|
### Raises:
|
|
* `UserNotFoundError`: User with such ID does not seem to exist in database
|
|
"""
|
|
if cache is not None:
|
|
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user.id}")
|
|
|
|
if cached_entry is not None:
|
|
return cls(**cached_entry)
|
|
|
|
db_entry: Dict[str, Any] | None = await col_users.find_one({"id": user.id})
|
|
|
|
if db_entry is None:
|
|
if not allow_creation:
|
|
raise UserNotFoundError(user=user, user_id=user.id)
|
|
|
|
db_entry = HoloUser.get_defaults(user.id)
|
|
|
|
insert_result: InsertOneResult = await col_users.insert_one(db_entry)
|
|
|
|
db_entry["_id"] = insert_result.inserted_id
|
|
|
|
if cache is not None:
|
|
cache.set_json(f"user_{user.id}", db_entry)
|
|
|
|
return cls(**db_entry)
|
|
|
|
@classmethod
|
|
async def from_id(cls, user_id: int) -> "HoloUser":
|
|
raise NotImplementedError()
|
|
|
|
async def _set(self, key: str, value: Any, cache: HoloCache | None = None) -> None:
|
|
"""Set attribute data and save it into the database
|
|
|
|
### Args:
|
|
* `key` (str): Attribute to be changed
|
|
* `value` (Any): Value to set
|
|
* `cache` (HoloCache | None, optional): Cache engine to write the update into
|
|
"""
|
|
if not hasattr(self, key):
|
|
raise AttributeError()
|
|
|
|
setattr(self, key, value)
|
|
|
|
await col_users.update_one(
|
|
{"_id": self._id}, {"$set": {key: value}}, upsert=True
|
|
)
|
|
|
|
self._update_cache(cache)
|
|
|
|
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value)
|
|
|
|
async def _remove(self, key: str, cache: HoloCache | None = None) -> None:
|
|
"""Remove attribute data and save it into the database
|
|
|
|
### Args:
|
|
* `key` (str): Attribute to be removed
|
|
* `cache` (HoloCache | None, optional): Cache engine to write the update into
|
|
"""
|
|
if not hasattr(self, key):
|
|
raise AttributeError()
|
|
|
|
default_value: Any = HoloUser.get_default_value(key)
|
|
|
|
setattr(self, key, default_value)
|
|
|
|
await col_users.update_one(
|
|
{"_id": self._id}, {"$set": {key: default_value}}, upsert=True
|
|
)
|
|
|
|
self._update_cache(cache)
|
|
|
|
logger.info("Removed attribute '%s' of user %s", key, self.id)
|
|
|
|
def _get_cache_key(self) -> str:
|
|
return f"user_{self.id}"
|
|
|
|
def _update_cache(self, cache: HoloCache | None = None) -> None:
|
|
if cache is None:
|
|
return
|
|
|
|
user_dict: Dict[str, Any] = self._to_dict()
|
|
|
|
if user_dict is not None:
|
|
cache.set_json(self._get_cache_key(), user_dict)
|
|
else:
|
|
self._delete_cache(cache)
|
|
|
|
def _delete_cache(self, cache: HoloCache | None = None) -> None:
|
|
if cache is None:
|
|
return
|
|
|
|
cache.delete(self._get_cache_key())
|
|
|
|
@staticmethod
|
|
def get_defaults(user_id: int | None = None) -> Dict[str, Any]:
|
|
return {
|
|
"id": user_id,
|
|
"custom_role": None,
|
|
"custom_channel": None,
|
|
}
|
|
|
|
@staticmethod
|
|
def get_default_value(key: str) -> Any:
|
|
if key not in HoloUser.get_defaults():
|
|
raise KeyError(f"There's no default value for key '{key}' in HoloUser")
|
|
|
|
return HoloUser.get_defaults()[key]
|
|
|
|
def _to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"_id": self._id,
|
|
"id": self.id,
|
|
"custom_role": self.custom_role,
|
|
"custom_channel": self.custom_channel,
|
|
}
|
|
|
|
async def set_custom_channel(
|
|
self, channel_id: int, cache: HoloCache | None = None
|
|
) -> None:
|
|
await self._set("custom_channel", channel_id, cache=cache)
|
|
|
|
async def set_custom_role(
|
|
self, role_id: int, cache: HoloCache | None = None
|
|
) -> None:
|
|
await self._set("custom_role", role_id, cache=cache)
|
|
|
|
async def remove_custom_channel(self, cache: HoloCache | None = None) -> None:
|
|
await self._remove("custom_channel", cache=cache)
|
|
|
|
async def remove_custom_role(self, cache: HoloCache | None = None) -> None:
|
|
await self._remove("custom_role", cache=cache)
|
|
|
|
async def purge(self, cache: HoloCache | None = None) -> None:
|
|
"""Completely remove user data from database. Only removes the user record from users collection.
|
|
|
|
### Args:
|
|
* `cache` (HoloCache | None, optional): Cache engine to write the update into
|
|
"""
|
|
await col_users.delete_one({"_id": self._id})
|
|
self._delete_cache(cache)
|
|
|
|
@staticmethod
|
|
async def is_moderator(member: User | Member) -> bool:
|
|
"""Check if user is moderator or council member
|
|
|
|
### Args:
|
|
* `member` (User | Member): Member object
|
|
|
|
### Returns:
|
|
`bool`: `True` if member is a moderator or member of council and `False` if not
|
|
"""
|
|
if isinstance(member, User):
|
|
return False
|
|
|
|
moderator_role: int | None = await config_get("moderators", "roles")
|
|
council_role: int | None = await config_get("council", "roles")
|
|
|
|
for role in member.roles:
|
|
if role.id in (moderator_role, council_role):
|
|
return True
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
async def is_council(member: User | Member) -> bool:
|
|
"""Check if user is a member of council
|
|
|
|
### Args:
|
|
* `member` (User | Member): Member object
|
|
|
|
### Returns:
|
|
`bool`: `True` if member is a member of council and `False` if not
|
|
"""
|
|
if isinstance(member, User):
|
|
return False
|
|
|
|
council_role = await config_get("council", "roles")
|
|
|
|
for role in member.roles:
|
|
if role.id == council_role:
|
|
return True
|
|
|
|
return False
|