from app import app, isAnAdmin from typing import Any, List, Union from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message from pyrogram.client import Client from pyrogram.errors import bad_request_400 from modules.database import col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages from modules.logging import logWrite from modules.utils import configGet, locale, should_quote class UserNotFoundError(Exception): """HoloUser could not find user with such an ID in database""" def __init__(self, user, user_id): self.user = user self.user_id = user_id super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found") class UserInvalidError(Exception): """Provided to HoloUser object is not supported""" def __init__(self, user): self.user = user super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type") class HoloUser(): def __init__(self, user: Union[List[User], User, ChatMember, int, str]) -> None: # Determine input object class and extract id if isinstance(user, list) and len(user) != 0: self.id = user[0].id elif isinstance(user, User): self.id = user.id elif isinstance(user, ChatMember): self.id = user.user.id elif isinstance(user, int): self.id = user elif isinstance(user, str): try: self.id = (app.get_users(user)).id except bad_request_400.UsernameNotOccupied: raise UserInvalidError(user) except bad_request_400.PeerIdInvalid: raise UserInvalidError(user) else: raise UserInvalidError(user) # Find user record in DB holo_user = col_users.find_one({"user": self.id}) if holo_user is None: raise UserNotFoundError(user=user, user_id=self.id) self.db_id = holo_user["_id"] self.link = holo_user["link"] self.label = holo_user["label"] self.name = holo_user["tg_name"] self.phone = holo_user["tg_phone"] self.locale = holo_user["tg_locale"] self.username = holo_user["tg_username"] def set(self, key: str, value: Any) -> None: """Set attribute data and save it into database ### Args: * `key` (`str`): Attribute to be changed * `value` (`Any`): Value to set """ if not hasattr(self, key): raise AttributeError() setattr(self, key, value) col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True) logWrite(f"Set attribute {key} of user {self.id} to {value}") async def message(self, origin: Message, text: Union[str, None] = None, photo: Union[str, None] = None, video: Union[str, None] = None, file: Union[str, None] = None): new_message = await app.send_message(self.id, text+locale("message_reply_notice", "message")) await origin.reply_text(locale("message_sent", "message"), quote=should_quote(origin)) logWrite(f"Admin {origin.from_user.id} sent message '{' '.join(origin.command[2:])}' to {self.id}") col_messages.insert_one({"origin": {"chat": origin.chat.id, "id": origin.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}}) async def set_label(self, chat: Chat, label: str): """Set label in destination group ### Args: * app (`Client`): Pyrogram client * label (`str`): Label you want to set """ self.label = label self.set("label", label) await app.promote_chat_member(configGet("destination_group"), self.id) if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")): await app.set_administrator_title(configGet("destination_group"), self.id, label) async def reset_label(self, chat: Chat): """Reset label in destination group ### Args: * app (`Client`): Pyrogram client """ self.label = "" self.set("label", "") await app.set_administrator_title(configGet("destination_group"), self.id, "") if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")): await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges( can_manage_chat=False ))