187 lines
5.6 KiB
Python
187 lines
5.6 KiB
Python
import logging
|
|
from logging import Logger
|
|
from typing import Any, Dict
|
|
|
|
from bson import ObjectId
|
|
from discord import User, Member
|
|
from libbot.utils import config_get
|
|
from pymongo.results import InsertOneResult
|
|
|
|
from errors import UserNotFoundError
|
|
from modules.database import col_warnings, col_users
|
|
|
|
logger: Logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HoloUser:
|
|
def __init__(
|
|
self,
|
|
_id: ObjectId,
|
|
id: int,
|
|
custom_role: int | None,
|
|
custom_channel: int | None,
|
|
) -> None:
|
|
self._id: ObjectId = _id
|
|
|
|
self.id: int = id
|
|
self.custom_role: int | None = custom_role
|
|
self.custom_channel: int | None = custom_channel
|
|
|
|
@classmethod
|
|
async def from_user(
|
|
cls, user: User | Member, allow_creation: bool = True
|
|
) -> "HoloUser":
|
|
"""Get an object that has a proper binding between Discord ID and database
|
|
|
|
### Args:
|
|
* `user` (User | Member): Object from which an ID can be extracted
|
|
|
|
### Raises:
|
|
* `UserNotFoundError`: User with such ID does not seem to exist in database
|
|
"""
|
|
db_entry: Dict[str, Any] | None = await col_users.find_one({"user": user.id})
|
|
|
|
if db_entry is None:
|
|
if not allow_creation:
|
|
raise UserNotFoundError(user=user, user_id=user.id)
|
|
|
|
db_entry = {
|
|
"user": user.id,
|
|
"custom_role": None,
|
|
"custom_channel": None,
|
|
}
|
|
|
|
insert_result: InsertOneResult = await col_users.insert_one(db_entry)
|
|
|
|
db_entry["_id"] = insert_result.inserted_id()
|
|
|
|
db_entry["id"] = db_entry.pop("user")
|
|
|
|
return cls(**db_entry)
|
|
|
|
@classmethod
|
|
async def from_id(cls, user_id: int) -> "HoloUser":
|
|
return NotImplemented
|
|
|
|
async def get_warnings(self) -> int:
|
|
"""Get number of warnings user has
|
|
|
|
### Returns:
|
|
* `int`: Number of warnings
|
|
"""
|
|
warns: Dict[str, Any] | None = await col_warnings.find_one({"user": self.id})
|
|
|
|
return 0 if warns is None else warns["warns"]
|
|
|
|
async def warn(self, count: int = 1, reason: str = "Not provided") -> None:
|
|
"""Warn and add count to warns number
|
|
|
|
### Args:
|
|
* `count` (int, optional): Count of warnings to be added. Defaults to 1.
|
|
"""
|
|
warns: Dict[str, Any] | None = await col_warnings.find_one({"user": self.id})
|
|
|
|
if warns is not None:
|
|
await col_warnings.update_one(
|
|
{"_id": self._id},
|
|
{"$set": {"warns": warns["warns"] + count}},
|
|
)
|
|
else:
|
|
await col_warnings.insert_one(document={"user": self.id, "warns": count})
|
|
|
|
logger.info("User %s was warned %s times due to: %s", self.id, count, reason)
|
|
|
|
async def _set(self, key: str, value: Any) -> None:
|
|
"""Set attribute data and save it into the database
|
|
|
|
### Args:
|
|
* `key` (str): Attribute to be changed
|
|
* `value` (Any): Value to set
|
|
"""
|
|
if not hasattr(self, key):
|
|
raise AttributeError()
|
|
|
|
setattr(self, key, value)
|
|
|
|
await col_users.update_one(
|
|
{"_id": self._id}, {"$set": {key: value}}, upsert=True
|
|
)
|
|
|
|
logger.info("Set attribute %s of user %s to %s", key, self.id, value)
|
|
|
|
async def _remove(self, key: str) -> None:
|
|
"""Remove attribute data and save it into the database
|
|
|
|
### Args:
|
|
* `key` (str): Attribute to be removed
|
|
"""
|
|
if not hasattr(self, key):
|
|
raise AttributeError()
|
|
|
|
setattr(self, key, None)
|
|
|
|
await col_users.update_one(
|
|
{"_id": self._id}, {"$unset": {key: None}}, upsert=True
|
|
)
|
|
|
|
logger.info("Removed attribute %s of user %s", key, self.id)
|
|
|
|
async def set_custom_channel(self, channel_id: int) -> None:
|
|
await self._set("custom_channel", channel_id)
|
|
|
|
async def set_custom_role(self, role_id: int) -> None:
|
|
await self._set("custom_role", role_id)
|
|
|
|
async def remove_custom_channel(self) -> None:
|
|
await self._remove("custom_channel")
|
|
|
|
async def remove_custom_role(self) -> None:
|
|
await self._remove("custom_role")
|
|
|
|
async def purge(self) -> None:
|
|
"""Completely remove user data from database. Will not remove transactions logs and warnings."""
|
|
await col_users.delete_one({"_id": self._id})
|
|
|
|
@staticmethod
|
|
async def is_moderator(member: User | Member) -> bool:
|
|
"""Check if user is moderator or council member
|
|
|
|
### Args:
|
|
* `member` (User | Member): Member object
|
|
|
|
### Returns:
|
|
`bool`: `True` if member is a moderator or member of council and `False` if not
|
|
"""
|
|
if isinstance(member, User):
|
|
return False
|
|
|
|
moderator_role: int | None = await config_get("moderators", "roles")
|
|
council_role: int | None = await config_get("council", "roles")
|
|
|
|
for role in member.roles:
|
|
if role.id in (moderator_role, council_role):
|
|
return True
|
|
|
|
return False
|
|
|
|
@staticmethod
|
|
async def is_council(member: User | Member) -> bool:
|
|
"""Check if user is a member of council
|
|
|
|
### Args:
|
|
* `member` (User | Member): Member object
|
|
|
|
### Returns:
|
|
`bool`: `True` if member is a member of council and `False` if not
|
|
"""
|
|
if isinstance(member, User):
|
|
return False
|
|
|
|
council_role = await config_get("council", "roles")
|
|
|
|
for role in member.roles:
|
|
if role.id == council_role:
|
|
return True
|
|
|
|
return False
|