from datetime import datetime from asyncio import sleep from ftfy import fix_text from traceback import format_exc from app import app, isAnAdmin from typing import Any, List, Literal, Union from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup from pyrogram.errors import bad_request_400 from dateutil.relativedelta import relativedelta from classes.errors.geo import PlaceNotFoundError from classes.errors.holo_user import UserInvalidError, UserNotFoundError, LabelTooLongError, LabelSettingError from classes.templates import DefaultApplicationTemp, DefaultSponsorshipTemp from modules.database import col_tmp, col_users, col_applications, col_sponsorships, col_messages, col_spoilers from modules.logging import logWrite from modules.utils import configGet, find_location, locale, should_quote class HoloUser(): """This object represents a user of HoloChecker bot. It is primarily used to interact with a database in a more python-friendly way, as well as provide better programming experience in case of adding new features, etc. """ def __init__(self, user: Union[User, List[User], ChatMember, int]) -> None: """A user of Holo bot. Used to simplify DB interaction. ### Args: * user (`Union[User, List[User], ChatMember, int]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, etc... ### Raises: * UserInvalidError: Provided to `HoloUser` object is not supported * UserNotFoundError: `HoloUser` could not find user with such an ID in database """ # Determine input object class and extract id if isinstance(user, list) and len(user) != 0: self.id = user[0].id elif isinstance(user, User): self.id = user.id elif isinstance(user, ChatMember): self.id = user.user.id elif isinstance(user, int): self.id = user # elif isinstance(user, str): # try: # get_users = async_to_sync(app.get_users) # self.id = get_users(user).id # this line requires testing though # except bad_request_400.UsernameNotOccupied: # raise UserInvalidError(user) # except bad_request_400.PeerIdInvalid: # raise UserInvalidError(user) else: raise UserInvalidError(user) # Find user record in DB holo_user = col_users.find_one({"user": self.id}) if holo_user is None: raise UserNotFoundError(user=user, user_id=self.id) self.db_id = holo_user["_id"] self.link = holo_user["link"] self.label = holo_user["label"] self.name = holo_user["tg_name"] self.phone = holo_user["tg_phone"] self.locale = holo_user["tg_locale"] self.username = holo_user["tg_username"] if isinstance(user, User): if (self.name != user.first_name) and hasattr(user, "first_name") and (user.first_name is not None): self.set("name", user.first_name, db_key="tg_name") if (self.phone != user.phone_number) and hasattr(user, "phone") and (user.phone_number is not None): self.set("phone", user.phone_number, db_key="tg_phone") if (self.locale != user.language_code) and hasattr(user, "locale") and (user.language_code is not None): self.set("locale", user.language_code, db_key="tg_locale") if (self.username != user.username) and hasattr(user, "username") and (user.username is not None): self.set("username", user.username, db_key="tg_username") def set(self, key: str, value: Any, db_key: Union[str, None] = None) -> None: """Set attribute data and save it into database ### Args: * `key` (`str`): Attribute to be changed * `value` (`Any`): Value to set """ if not hasattr(self, key): raise AttributeError() setattr(self, key, value) db_key = key if db_key is None else db_key col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { db_key: value } }, upsert=True) logWrite(f"Set attribute {key} of user {self.id} to {value}") async def message(self, context: Message, origin: Union[Message, None] = None, text: Union[str, None] = None, caption: Union[str, None] = None, photo: Union[str, Photo, None] = None, video: Union[str, Video, None] = None, file: Union[str, Document, None] = None, animation: Union[str, Animation, None] = None, voice: Union[str, Voice, None] = None, adm_origin: bool = False, adm_context: bool = False ) -> None: """Send a message to user ### Args: * context (`Message`): Context (mostly the message where this method is called) * origin (`Union[Message, None]`, *optional*): Origin message where to refer. None if called in a command. Defaults to None. * text (`Union[str, None]`, *optional*): Text if this is a simple text message. Defaults to None. * caption (`Union[str, None]`, *optional*): Text if this is a media message. Defaults to None. * photo (`Union[str, Photo, None]`, *optional*): Photo as a photo object or file_id as a string. Defaults to None. * video (`Union[str, Video, None]`, *optional*): Video as a video object or file_id as a string. Defaults to None. * file (`Union[str, Document, None]`, *optional*): File as a document object or file_id as a string. Defaults to None. * animation (`Union[str, Animation, None]`, *optional*): Animation as an animation object or file_id as a string. Defaults to None. * voice (`Union[str, Voice, None]`, *optional*): Voice as a voice object or file_id as a string. Defaults to None. * adm_origin (`bool`, *optional*): Whether origin sender is an admin. Defaults to False. * adm_context (`bool`, *optional*): Whether context sender is an admin. Defaults to False. """ if text is not None: text = fix_text(text) elif caption is not None: caption = fix_text(caption) # Check if any text available and log message sending if text is not None: logWrite(f"{context.from_user.id} sent message '{text}' to {self.id} (source message: {context.id})") elif caption is not None: logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id} (source message: {context.id})") else: logWrite(f"{context.from_user.id} sent message to {self.id} (source message: {context.id})") # Add notices for admin or user if text is not None: if adm_context: text += locale("message_reply_notice", "message") elif adm_origin: text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message") else: text = locale("message_reply_notice", "message") if caption is not None: if adm_context: caption += locale("message_reply_notice", "message") elif adm_origin: caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message") else: caption = locale("message_reply_notice", "message") # Try sending the message try: # Check if origin message exists # This check decides whether we send_ a message or reply_ to one if origin is not None: if photo is not None: if isinstance(photo, Photo): photo = photo.file_id new_message = await origin.reply_cached_media(photo, caption=caption, quote=True) elif video is not None: if isinstance(video, Video): video = video.file_id new_message = await origin.reply_cached_media(video, caption=caption, quote=True) elif file is not None: if isinstance(file, Document): file = file.file_id new_message = await origin.reply_cached_media(file, caption=caption, quote=True) elif animation is not None: if isinstance(animation, Animation): animation = animation.file_id new_message = await origin.reply_cached_media(animation, caption=caption, quote=True) elif voice is not None: if isinstance(voice, Voice): voice = voice.file_id new_message = await origin.reply_voice(voice, caption=caption, quote=True) else: new_message = await origin.reply_text(text, quote=True) else: if photo is not None: if isinstance(photo, Photo): photo = photo.file_id new_message = await app.send_cached_media(self.id, photo, caption=caption) elif video is not None: if isinstance(video, Video): video = video.file_id new_message = await app.send_cached_media(self.id, video, caption=caption) elif file is not None: if isinstance(file, Document): file = file.file_id new_message = await app.send_cached_media(self.id, file, caption=caption) elif animation is not None: if isinstance(animation, Animation): animation = animation.file_id new_message = await app.send_cached_media(self.id, animation, caption=caption) elif voice is not None: if isinstance(voice, Voice): voice = voice.file_id new_message = await app.send_cached_media(self.id, voice, caption=caption) else: new_message = await app.send_message(self.id, text) # Acknowledge sending a message and save entry into DB await context.reply_text(locale("message_sent", "message"), quote=should_quote(context)) col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}}) # Report to admin and to sender about message sending failure except Exception as exp: logWrite(f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{format_exc()}") try: await app.send_message(configGet("owner"), locale("message_traceback", "message").format(context.from_user.id, self.id, exp, format_exc())) except bad_request_400.PeerIdInvalid: logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!") await context.reply_text(locale("message_error", "message"), quote=should_quote(context)) async def label_set(self, chat: Chat, label: str) -> None: """Set label in destination group ### Args: * chat (`Chat`): Telegram chat * label (`str`): Label you want to set """ if len(label) > 16: raise LabelTooLongError(label) self.label = label try: await app.promote_chat_member(configGet("users", "groups"), self.id, privileges=ChatPrivileges(can_pin_messages=True, can_manage_video_chats=True)) if not await isAnAdmin(self.id): await sleep(0.5) await app.set_administrator_title(configGet("users", "groups"), self.id, label) self.set("label", label) except Exception as exp: logWrite(f"Could not set {self.id}'s title to '{self.label}' due to {exp}") raise LabelSettingError(exp, format_exc()) async def label_reset(self, chat: Chat) -> None: """Reset label in destination group ### Args: * chat (`Chat`): Telegram chat """ self.label = "" self.set("label", "") await app.set_administrator_title(configGet("users", "groups"), self.id, "") if not await isAnAdmin(self.id): await app.promote_chat_member(configGet("users", "groups"), self.id, privileges=ChatPrivileges( can_manage_chat=False, can_pin_messages=False, can_manage_video_chats=False )) def application_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]: """Check the current state of application in tmp collection ### Returns: * `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether application is complete. """ tmp_application = col_tmp.find_one({"user": self.id, "type": "application"}) if tmp_application is None: return "none", False else: return tmp_application["state"], tmp_application["complete"] def application_approved(self) -> bool: """Check whether user has a completed application and it got approved ### Returns: * `bool`: `True` if yes and `False` if no """ return True if col_applications.find_one({"user": self.id}) is not None else False def application_restart(self, reapply: bool = False) -> None: """Reset application of a user in tmp collection and replace it with an empty one """ if col_tmp.find_one({"user": self.id, "type": "application"}) is None: col_tmp.insert_one(document=DefaultApplicationTemp(self.id, reapply=reapply).dict) else: col_tmp.find_one_and_replace({"user": self.id, "type": "application"}, DefaultApplicationTemp(self.id, reapply=reapply).dict) async def application_next(self, query: str, msg: Message) -> None: """Move on filling application of user ### Args: * query (`str`): Some kind of input * msg (`Message`): Message that should receive replies """ # if col_tmp.find_one({"user": self.id, "type": "application"}) is None: if self.sponsorship_state()[0] == "fill": return if self.spoiler_state() is True: return # col_tmp.insert_one( # document=DefaultApplicationTemp(self.id).dict # ) progress = col_tmp.find_one({"user": self.id, "type": "application"}) if progress is None: return stage = progress["stage"] # if self.sponsorship_state()[0] == "fill": # await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg)) # return if progress["state"] == "fill" and progress["sent"] is False: if msg.text is not None: msg.text = fix_text(msg.text) if stage == 2: try: input_dt = datetime.strptime(query, "%d.%m.%Y") except ValueError: logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format") await msg.reply_text(locale(f"question2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale)))) return if (datetime.now() <= input_dt) or ((datetime.now() - input_dt).days) > ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_maximum"))).days): logWrite(f"User {msg.from_user.id} failed stage {stage} due to joking") await msg.reply_text(locale("question2_joke", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale)))) return elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days): logWrite(f"User {msg.from_user.id} failed stage {stage} due to being underage") await msg.reply_text(locale("question2_underage", "message", locale=self.locale).format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale)))) return else: progress["application"][str(stage)] = input_dt col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale)))) elif stage == 3: try: progress["application"][str(stage)] = find_location(query) if ("lat" in progress["application"][str(stage)] and "lng" in progress["application"][str(stage)]): progress["application"][str(stage)]["location"] = [float(progress["application"][str(stage)]["lng"]), float(progress["application"][str(stage)]["lat"])] del progress["application"][str(stage)]["lat"] del progress["application"][str(stage)]["lng"] col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}}) await msg.reply_text(locale("question3_found", "message", locale=self.locale).format(progress["application"][str(stage)]["name"], progress["application"][str(stage)]["adminName1"])) await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale)))) except PlaceNotFoundError: await msg.reply_text(locale("question3_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale)))) return except Exception as exp: await msg.reply_text(locale("question3_error", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale)))) try: await app.send_message(configGet("owner"), locale("question3_traceback", "message", locale=self.locale).format(query, exp, format_exc())) except bad_request_400.PeerIdInvalid: logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!") return elif stage == 10: if len(query) > 1024: await msg.reply_text(locale("question10_too_long", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale)))) return progress["application"][str(stage)] = query col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "complete": True}}) application_content = [] i = 1 for question in progress["application"]: if i == 2: age = relativedelta(datetime.now(), progress['application']['2']) application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)") elif i == 3: if progress['application']['3']['countryCode'] == "UA": application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})") else: application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})") else: application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application'][question]}") i += 1 await msg.reply_text(locale("confirm", "message", locale=self.locale).format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True)) else: if len(query) > 256: await msg.reply_text(locale("question_too_long", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale)))) return progress["application"][str(stage)] = query col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale)))) logWrite(f"User {self.id} completed stage {stage} of application") else: return def sponsorship_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]: """Check the current state of sponsorship in tmp collection ### Returns: * `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether sponsorship application is complete. """ tmp_sponsorship = col_tmp.find_one({"user": self.id, "type": "sponsorship"}) if tmp_sponsorship is None: return "none", False else: return tmp_sponsorship["state"], tmp_sponsorship["complete"] def sponsorship_valid(self) -> bool: """Check whether user has a valid sponsorship ### Returns: * `bool`: `True` if yes and `False` if no """ return True if col_sponsorships.find_one({"user": self.id, "expires": {"$gt": datetime.now()}}) is not None else False def sponsorship_restart(self) -> None: """Reset sponsorship of a user in tmp collection and replace it with an empty one """ if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is None: col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict) else: col_tmp.delete_one({"user": self.id, "type": "sponsorship"}) col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict) async def sponsorship_next(self, query: str, msg: Message, photo: Union[Photo, None] = None) -> None: """Move on filling sponsorship of user ### Args: * query (`str`): Some kind of input * msg (`Message`): Message that should receive replies """ progress = col_tmp.find_one({"user": self.id, "type": "sponsorship"}) if progress is not None: stage = progress["stage"] if msg.text is not None: msg.text = fix_text(msg.text) elif msg.caption is not None: msg.caption = fix_text(msg.caption) if progress["state"] == "fill" and progress["sent"] is False: if stage == 1: if len(query) > 240: logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format") await msg.reply_text(locale(f"sponsor1_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale)))) return progress["sponsorship"]["streamer"] = query col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale)))) elif stage == 2: try: input_dt = datetime.strptime(query, "%d.%m.%Y") except ValueError: logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format") await msg.reply_text(locale(f"sponsor2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale)))) return if datetime.now() >= input_dt: logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending date in the past") await msg.reply_text(locale("sponsor2_past", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("sponsor2", "force_reply", locale=self.locale)))) return else: progress["sponsorship"]["expires"] = input_dt col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale)))) elif stage == 3: if photo is not None: progress["sponsorship"]["proof"] = photo.file_id col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale)))) elif stage == 4: if len(query) > 16: await msg.reply_text(locale("label_too_long", "message"), reply_markup=ForceReply(placeholder=str(locale("sponsor4", "force_reply", locale=self.locale)))) return progress["sponsorship"]["label"] = query col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "complete": True}}) await msg.reply_cached_media( progress["sponsorship"]["proof"], caption=locale("sponsor_confirm", "message", locale=self.locale).format( progress["sponsorship"]["streamer"], progress["sponsorship"]["expires"].strftime("%d.%m.%Y"), progress["sponsorship"]["label"] ), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True)) else: return logWrite(f"User {self.id} completed stage {stage} of sponsorship") else: return def spoiler_state(self) -> bool: """Check if user has any started but not finished spoilers ### Returns: * `bool`: `True` if any not finished spoilers available and `False` if none. """ return False if col_spoilers.find_one({"user": self.id, "completed": False}) is None else True