Telegram/classes/holo_user.py

105 lines
4.5 KiB
Python

from app import app, isAnAdmin
from typing import Any, List, Union
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message
from pyrogram.client import Client
from pyrogram.errors import bad_request_400
from modules.database import col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages
from modules.logging import logWrite
from modules.utils import configGet, locale, should_quote
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found")
class UserInvalidError(Exception):
"""Provided to HoloUser object is not supported"""
def __init__(self, user):
self.user = user
super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type")
class HoloUser():
def __init__(self, user: Union[List[User], User, ChatMember, int, str]) -> None:
# Determine input object class and extract id
if isinstance(user, list) and len(user) != 0:
self.id = user[0].id
elif isinstance(user, User):
self.id = user.id
elif isinstance(user, ChatMember):
self.id = user.user.id
elif isinstance(user, int):
self.id = user
elif isinstance(user, str):
try:
self.id = (app.get_users(user)).id # this line requires testing though
except bad_request_400.UsernameNotOccupied:
raise UserInvalidError(user)
except bad_request_400.PeerIdInvalid:
raise UserInvalidError(user)
else:
raise UserInvalidError(user)
# Find user record in DB
holo_user = col_users.find_one({"user": self.id})
if holo_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
self.db_id = holo_user["_id"]
self.link = holo_user["link"]
self.label = holo_user["label"]
self.name = holo_user["tg_name"]
self.phone = holo_user["tg_phone"]
self.locale = holo_user["tg_locale"]
self.username = holo_user["tg_username"]
def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (`str`): Attribute to be changed
* `value` (`Any`): Value to set
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True)
logWrite(f"Set attribute {key} of user {self.id} to {value}")
async def message(self, origin: Message, text: Union[str, None] = None, photo: Union[str, None] = None, video: Union[str, None] = None, file: Union[str, None] = None):
new_message = await app.send_message(self.id, text+locale("message_reply_notice", "message"))
await origin.reply_text(locale("message_sent", "message"), quote=should_quote(origin))
logWrite(f"Admin {origin.from_user.id} sent message '{' '.join(origin.command[2:])}' to {self.id}")
col_messages.insert_one({"origin": {"chat": origin.chat.id, "id": origin.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}})
async def set_label(self, chat: Chat, label: str):
"""Set label in destination group
### Args:
* app (`Client`): Pyrogram client
* label (`str`): Label you want to set
"""
self.label = label
self.set("label", label)
await app.promote_chat_member(configGet("destination_group"), self.id)
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
await app.set_administrator_title(configGet("destination_group"), self.id, label)
async def reset_label(self, chat: Chat):
"""Reset label in destination group
### Args:
* app (`Client`): Pyrogram client
"""
self.label = ""
self.set("label", "")
await app.set_administrator_title(configGet("destination_group"), self.id, "")
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges(
can_manage_chat=False
))