This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/classes/holo_user.py

196 lines
8.6 KiB
Python
Raw Normal View History

2022-12-13 15:33:22 +02:00
from traceback import print_exc
2022-12-12 00:31:36 +02:00
from app import app, isAnAdmin
2022-12-11 20:12:41 +02:00
from typing import Any, List, Union
2022-12-13 15:24:31 +02:00
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice
2022-12-11 02:31:17 +02:00
from pyrogram.client import Client
2022-12-12 00:31:36 +02:00
from pyrogram.errors import bad_request_400
from modules.database import col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages
2022-12-11 20:12:41 +02:00
from modules.logging import logWrite
2022-12-12 00:31:36 +02:00
from modules.utils import configGet, locale, should_quote
2022-12-10 13:08:30 +02:00
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found")
2022-12-11 02:31:17 +02:00
class UserInvalidError(Exception):
"""Provided to HoloUser object is not supported"""
def __init__(self, user):
self.user = user
super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type")
2022-12-10 12:37:15 +02:00
class HoloUser():
2022-12-11 02:31:17 +02:00
2022-12-12 00:31:36 +02:00
def __init__(self, user: Union[List[User], User, ChatMember, int, str]) -> None:
2022-12-11 02:31:17 +02:00
# Determine input object class and extract id
if isinstance(user, list) and len(user) != 0:
self.id = user[0].id
elif isinstance(user, User):
self.id = user.id
elif isinstance(user, ChatMember):
self.id = user.user.id
elif isinstance(user, int):
self.id = user
2022-12-12 00:31:36 +02:00
elif isinstance(user, str):
try:
2022-12-12 00:32:04 +02:00
self.id = (app.get_users(user)).id # this line requires testing though
2022-12-12 00:31:36 +02:00
except bad_request_400.UsernameNotOccupied:
raise UserInvalidError(user)
except bad_request_400.PeerIdInvalid:
raise UserInvalidError(user)
2022-12-11 02:31:17 +02:00
else:
raise UserInvalidError(user)
# Find user record in DB
holo_user = col_users.find_one({"user": self.id})
if holo_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
self.db_id = holo_user["_id"]
2022-12-11 20:12:41 +02:00
self.link = holo_user["link"]
2022-12-11 02:31:17 +02:00
self.label = holo_user["label"]
2022-12-11 20:12:41 +02:00
self.name = holo_user["tg_name"]
self.phone = holo_user["tg_phone"]
self.locale = holo_user["tg_locale"]
self.username = holo_user["tg_username"]
2022-12-12 00:31:36 +02:00
def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (`str`): Attribute to be changed
* `value` (`Any`): Value to set
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True)
logWrite(f"Set attribute {key} of user {self.id} to {value}")
2022-12-13 15:24:31 +02:00
async def message(self,
2022-12-13 15:33:22 +02:00
context: Message,
2022-12-13 15:24:31 +02:00
origin: Union[Message, None] = None,
text: Union[str, None] = None,
caption: Union[str, None] = None,
photo: Union[str, Photo, None] = None,
video: Union[str, Video, None] = None,
file: Union[str, Document, None] = None,
animation: Union[str, Animation, None] = None,
voice: Union[str, Voice, None] = None,
adm_origin: bool = False,
adm_context: bool = False
):
if text is not None:
logWrite(f"{context.from_user.id} sent message '{text}' to {self.id}")
elif caption is not None:
logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id}")
else:
logWrite(f"{context.from_user.id} sent message to {self.id}")
if text is not None:
if adm_context:
text += locale("message_reply_notice", "message")
elif adm_origin:
text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message")
else:
text = locale("message_reply_notice", "message")
if caption is not None:
if adm_context:
caption += locale("message_reply_notice", "message")
elif adm_origin:
caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message")
else:
caption = locale("message_reply_notice", "message")
2022-12-13 15:33:22 +02:00
try:
if origin is not None:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await origin.reply_photo(photo, caption=caption, quote=True)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await origin.reply_video(video, caption=caption, quote=True)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await origin.reply_document(file, caption=caption, quote=True)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await origin.reply_animation(animation, caption=caption, quote=True)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await origin.reply_voice(voice, caption=caption, quote=True)
else:
new_message = await origin.reply_text(text, quote=True)
2022-12-13 15:24:31 +02:00
else:
2022-12-13 15:33:22 +02:00
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await app.send_photo(self.id, photo, caption=caption)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await app.send_video(self.id, video, caption=caption)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await app.send_document(self.id, file, caption=caption)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await app.send_animation(animation, caption=caption, quote=True)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await app.send_voice(voice, caption=caption, quote=True)
else:
new_message = await app.send_message(self.id, text)
await context.reply_text(locale("message_sent", "message"), quote=should_quote(context))
col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}})
except Exception as exp:
await context.reply_text(locale("message_traceback", "message").format(context.from_user.id, self.id, exp, print_exc()))
await context.reply_text(locale("message_error", "message"), quote=should_quote(context))
2022-12-11 02:31:17 +02:00
2022-12-12 00:31:36 +02:00
async def set_label(self, chat: Chat, label: str):
2022-12-11 02:31:17 +02:00
"""Set label in destination group
### Args:
* app (`Client`): Pyrogram client
* label (`str`): Label you want to set
"""
self.label = label
2022-12-12 00:31:36 +02:00
self.set("label", label)
2022-12-11 02:31:17 +02:00
await app.promote_chat_member(configGet("destination_group"), self.id)
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
await app.set_administrator_title(configGet("destination_group"), self.id, label)
2022-12-12 00:31:36 +02:00
async def reset_label(self, chat: Chat):
2022-12-11 02:31:17 +02:00
"""Reset label in destination group
### Args:
* app (`Client`): Pyrogram client
"""
self.label = ""
2022-12-12 00:31:36 +02:00
self.set("label", "")
2022-12-11 02:31:17 +02:00
await app.set_administrator_title(configGet("destination_group"), self.id, "")
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges(
can_manage_chat=False
2022-12-12 00:31:36 +02:00
))