Telegram/classes/holo_user.py

196 lines
8.6 KiB
Python

from traceback import print_exc
from app import app, isAnAdmin
from typing import Any, List, Union
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice
from pyrogram.client import Client
from pyrogram.errors import bad_request_400
from modules.database import col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages
from modules.logging import logWrite
from modules.utils import configGet, locale, should_quote
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found")
class UserInvalidError(Exception):
"""Provided to HoloUser object is not supported"""
def __init__(self, user):
self.user = user
super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type")
class HoloUser():
def __init__(self, user: Union[List[User], User, ChatMember, int, str]) -> None:
# Determine input object class and extract id
if isinstance(user, list) and len(user) != 0:
self.id = user[0].id
elif isinstance(user, User):
self.id = user.id
elif isinstance(user, ChatMember):
self.id = user.user.id
elif isinstance(user, int):
self.id = user
elif isinstance(user, str):
try:
self.id = (app.get_users(user)).id # this line requires testing though
except bad_request_400.UsernameNotOccupied:
raise UserInvalidError(user)
except bad_request_400.PeerIdInvalid:
raise UserInvalidError(user)
else:
raise UserInvalidError(user)
# Find user record in DB
holo_user = col_users.find_one({"user": self.id})
if holo_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
self.db_id = holo_user["_id"]
self.link = holo_user["link"]
self.label = holo_user["label"]
self.name = holo_user["tg_name"]
self.phone = holo_user["tg_phone"]
self.locale = holo_user["tg_locale"]
self.username = holo_user["tg_username"]
def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (`str`): Attribute to be changed
* `value` (`Any`): Value to set
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True)
logWrite(f"Set attribute {key} of user {self.id} to {value}")
async def message(self,
context: Message,
origin: Union[Message, None] = None,
text: Union[str, None] = None,
caption: Union[str, None] = None,
photo: Union[str, Photo, None] = None,
video: Union[str, Video, None] = None,
file: Union[str, Document, None] = None,
animation: Union[str, Animation, None] = None,
voice: Union[str, Voice, None] = None,
adm_origin: bool = False,
adm_context: bool = False
):
if text is not None:
logWrite(f"{context.from_user.id} sent message '{text}' to {self.id}")
elif caption is not None:
logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id}")
else:
logWrite(f"{context.from_user.id} sent message to {self.id}")
if text is not None:
if adm_context:
text += locale("message_reply_notice", "message")
elif adm_origin:
text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message")
else:
text = locale("message_reply_notice", "message")
if caption is not None:
if adm_context:
caption += locale("message_reply_notice", "message")
elif adm_origin:
caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message")
else:
caption = locale("message_reply_notice", "message")
try:
if origin is not None:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await origin.reply_photo(photo, caption=caption, quote=True)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await origin.reply_video(video, caption=caption, quote=True)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await origin.reply_document(file, caption=caption, quote=True)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await origin.reply_animation(animation, caption=caption, quote=True)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await origin.reply_voice(voice, caption=caption, quote=True)
else:
new_message = await origin.reply_text(text, quote=True)
else:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await app.send_photo(self.id, photo, caption=caption)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await app.send_video(self.id, video, caption=caption)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await app.send_document(self.id, file, caption=caption)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await app.send_animation(animation, caption=caption, quote=True)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await app.send_voice(voice, caption=caption, quote=True)
else:
new_message = await app.send_message(self.id, text)
await context.reply_text(locale("message_sent", "message"), quote=should_quote(context))
col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}})
except Exception as exp:
await context.reply_text(locale("message_traceback", "message").format(context.from_user.id, self.id, exp, print_exc()))
await context.reply_text(locale("message_error", "message"), quote=should_quote(context))
async def set_label(self, chat: Chat, label: str):
"""Set label in destination group
### Args:
* app (`Client`): Pyrogram client
* label (`str`): Label you want to set
"""
self.label = label
self.set("label", label)
await app.promote_chat_member(configGet("destination_group"), self.id)
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
await app.set_administrator_title(configGet("destination_group"), self.id, label)
async def reset_label(self, chat: Chat):
"""Reset label in destination group
### Args:
* app (`Client`): Pyrogram client
"""
self.label = ""
self.set("label", "")
await app.set_administrator_title(configGet("destination_group"), self.id, "")
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges(
can_manage_chat=False
))