from datetime import datetime from asyncio import sleep from ftfy import fix_text from traceback import format_exc from app import app, isAnAdmin from typing import Any, List, Literal, Union from pyrogram.types import ( User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup, ) from pyrogram.errors import bad_request_400 from dateutil.relativedelta import relativedelta from classes.errors.geo import PlaceNotFoundError from classes.errors.holo_user import ( UserInvalidError, UserNotFoundError, LabelTooLongError, LabelSettingError, ) from classes.templates import DefaultApplicationTemp, DefaultSponsorshipTemp from modules.database import ( col_tmp, col_users, col_applications, col_sponsorships, col_messages, col_spoilers, ) from modules.logging import logWrite from modules.utils import configGet, find_location, locale, should_quote class HoloUser: """This object represents a user of HoloChecker bot. It is primarily used to interact with a database in a more python-friendly way, as well as provide better programming experience in case of adding new features, etc. """ def __init__(self, user: Union[User, List[User], ChatMember, int]) -> None: """A user of Holo bot. Used to simplify DB interaction. ### Args: * user (`Union[User, List[User], ChatMember, int]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, etc... ### Raises: * UserInvalidError: Provided to `HoloUser` object is not supported * UserNotFoundError: `HoloUser` could not find user with such an ID in database """ # Determine input object class and extract id if isinstance(user, list) and len(user) != 0: self.id = user[0].id elif isinstance(user, User): self.id = user.id elif isinstance(user, ChatMember): self.id = user.user.id elif isinstance(user, int): self.id = user # elif isinstance(user, str): # try: # get_users = async_to_sync(app.get_users) # self.id = get_users(user).id # this line requires testing though # except bad_request_400.UsernameNotOccupied: # raise UserInvalidError(user) # except bad_request_400.PeerIdInvalid: # raise UserInvalidError(user) else: raise UserInvalidError(user) # Find user record in DB holo_user = col_users.find_one({"user": self.id}) if holo_user is None: raise UserNotFoundError(user=user, user_id=self.id) self.db_id = holo_user["_id"] self.link = holo_user["link"] self.label = holo_user["label"] self.name = holo_user["tg_name"] self.phone = holo_user["tg_phone"] self.locale = holo_user["tg_locale"] self.username = holo_user["tg_username"] if isinstance(user, User): if ( (self.name != user.first_name) and hasattr(user, "first_name") and (user.first_name is not None) ): self.set("name", user.first_name, db_key="tg_name") if ( (self.phone != user.phone_number) and hasattr(user, "phone") and (user.phone_number is not None) ): self.set("phone", user.phone_number, db_key="tg_phone") if ( (self.locale != user.language_code) and hasattr(user, "locale") and (user.language_code is not None) ): self.set("locale", user.language_code, db_key="tg_locale") if ( (self.username != user.username) and hasattr(user, "username") and (user.username is not None) ): self.set("username", user.username, db_key="tg_username") def set(self, key: str, value: Any, db_key: Union[str, None] = None) -> None: """Set attribute data and save it into database ### Args: * `key` (`str`): Attribute to be changed * `value` (`Any`): Value to set """ if not hasattr(self, key): raise AttributeError() setattr(self, key, value) db_key = key if db_key is None else db_key col_users.update_one( filter={"_id": self.db_id}, update={"$set": {db_key: value}}, upsert=True ) logWrite(f"Set attribute {key} of user {self.id} to {value}") async def message( self, context: Message, origin: Union[Message, None] = None, text: Union[str, None] = None, caption: Union[str, None] = None, photo: Union[str, Photo, None] = None, video: Union[str, Video, None] = None, file: Union[str, Document, None] = None, animation: Union[str, Animation, None] = None, voice: Union[str, Voice, None] = None, adm_origin: bool = False, adm_context: bool = False, ) -> None: """Send a message to user ### Args: * context (`Message`): Context (mostly the message where this method is called) * origin (`Union[Message, None]`, *optional*): Origin message where to refer. None if called in a command. Defaults to None. * text (`Union[str, None]`, *optional*): Text if this is a simple text message. Defaults to None. * caption (`Union[str, None]`, *optional*): Text if this is a media message. Defaults to None. * photo (`Union[str, Photo, None]`, *optional*): Photo as a photo object or file_id as a string. Defaults to None. * video (`Union[str, Video, None]`, *optional*): Video as a video object or file_id as a string. Defaults to None. * file (`Union[str, Document, None]`, *optional*): File as a document object or file_id as a string. Defaults to None. * animation (`Union[str, Animation, None]`, *optional*): Animation as an animation object or file_id as a string. Defaults to None. * voice (`Union[str, Voice, None]`, *optional*): Voice as a voice object or file_id as a string. Defaults to None. * adm_origin (`bool`, *optional*): Whether origin sender is an admin. Defaults to False. * adm_context (`bool`, *optional*): Whether context sender is an admin. Defaults to False. """ if text is not None: text = fix_text(text) elif caption is not None: caption = fix_text(caption) # Check if any text available and log message sending if text is not None: logWrite( f"{context.from_user.id} sent message '{text}' to {self.id} (source message: {context.id})" ) elif caption is not None: logWrite( f"{context.from_user.id} sent message '{caption}' to {self.id} (source message: {context.id})" ) else: logWrite( f"{context.from_user.id} sent message to {self.id} (source message: {context.id})" ) # Add notices for admin or user if text is not None: if adm_context: text += locale("message_reply_notice", "message") elif adm_origin: text = ( locale("message_from", "message").format( context.from_user.first_name, context.from_user.id ) + text + locale("message_reply_notice", "message") ) else: text = locale("message_reply_notice", "message") if caption is not None: if adm_context: caption += locale("message_reply_notice", "message") elif adm_origin: caption = ( locale("message_from", "message").format( context.from_user.first_name, context.from_user.id ) + caption + locale("message_reply_notice", "message") ) else: caption = locale("message_reply_notice", "message") # Try sending the message try: # Check if origin message exists # This check decides whether we send_ a message or reply_ to one if origin is not None: if photo is not None: if isinstance(photo, Photo): photo = photo.file_id new_message = await origin.reply_cached_media( photo, caption=caption, quote=True ) elif video is not None: if isinstance(video, Video): video = video.file_id new_message = await origin.reply_cached_media( video, caption=caption, quote=True ) elif file is not None: if isinstance(file, Document): file = file.file_id new_message = await origin.reply_cached_media( file, caption=caption, quote=True ) elif animation is not None: if isinstance(animation, Animation): animation = animation.file_id new_message = await origin.reply_cached_media( animation, caption=caption, quote=True ) elif voice is not None: if isinstance(voice, Voice): voice = voice.file_id new_message = await origin.reply_voice( voice, caption=caption, quote=True ) else: new_message = await origin.reply_text(text, quote=True) else: if photo is not None: if isinstance(photo, Photo): photo = photo.file_id new_message = await app.send_cached_media( self.id, photo, caption=caption ) elif video is not None: if isinstance(video, Video): video = video.file_id new_message = await app.send_cached_media( self.id, video, caption=caption ) elif file is not None: if isinstance(file, Document): file = file.file_id new_message = await app.send_cached_media( self.id, file, caption=caption ) elif animation is not None: if isinstance(animation, Animation): animation = animation.file_id new_message = await app.send_cached_media( self.id, animation, caption=caption ) elif voice is not None: if isinstance(voice, Voice): voice = voice.file_id new_message = await app.send_cached_media( self.id, voice, caption=caption ) else: new_message = await app.send_message(self.id, text) # Acknowledge sending a message and save entry into DB await context.reply_text( locale("message_sent", "message"), quote=should_quote(context) ) col_messages.insert_one( { "origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}, } ) # Report to admin and to sender about message sending failure except Exception as exp: logWrite( f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{format_exc()}" ) try: await app.send_message( configGet("owner"), locale("message_traceback", "message").format( context.from_user.id, self.id, exp, format_exc() ), ) except bad_request_400.PeerIdInvalid: logWrite( f"Could not notify admin about failure when sending message! Admin has never interacted with bot!" ) await context.reply_text( locale("message_error", "message"), quote=should_quote(context) ) async def label_set(self, chat: Chat, label: str) -> None: """Set label in destination group ### Args: * chat (`Chat`): Telegram chat * label (`str`): Label you want to set """ if len(label) > 16: raise LabelTooLongError(label) self.label = label try: await app.promote_chat_member( configGet("users", "groups"), self.id, privileges=ChatPrivileges( can_pin_messages=True, can_manage_video_chats=True ), ) if not await isAnAdmin(self.id): await sleep(0.5) await app.set_administrator_title( configGet("users", "groups"), self.id, label ) self.set("label", label) except Exception as exp: logWrite(f"Could not set {self.id}'s title to '{self.label}' due to {exp}") raise LabelSettingError(exp, format_exc()) async def label_reset(self, chat: Chat) -> None: """Reset label in destination group ### Args: * chat (`Chat`): Telegram chat """ self.label = "" self.set("label", "") await app.set_administrator_title(configGet("users", "groups"), self.id, "") if not await isAnAdmin(self.id): await app.promote_chat_member( configGet("users", "groups"), self.id, privileges=ChatPrivileges( can_manage_chat=False, can_pin_messages=False, can_manage_video_chats=False, ), ) def application_state( self, ) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]: """Check the current state of application in tmp collection ### Returns: * `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether application is complete. """ tmp_application = col_tmp.find_one({"user": self.id, "type": "application"}) if tmp_application is None: return "none", False else: return tmp_application["state"], tmp_application["complete"] def application_approved(self) -> bool: """Check whether user has a completed application and it got approved ### Returns: * `bool`: `True` if yes and `False` if no """ return ( True if col_applications.find_one({"user": self.id}) is not None else False ) def application_restart(self, reapply: bool = False) -> None: """Reset application of a user in tmp collection and replace it with an empty one""" if col_tmp.find_one({"user": self.id, "type": "application"}) is None: col_tmp.insert_one( document=DefaultApplicationTemp(self.id, reapply=reapply).dict ) else: col_tmp.find_one_and_replace( {"user": self.id, "type": "application"}, DefaultApplicationTemp(self.id, reapply=reapply).dict, ) async def application_next(self, query: str, msg: Message) -> None: """Move on filling application of user ### Args: * query (`str`): Some kind of input * msg (`Message`): Message that should receive replies """ # if col_tmp.find_one({"user": self.id, "type": "application"}) is None: if self.sponsorship_state()[0] == "fill": return if self.spoiler_state() is True: return # col_tmp.insert_one( # document=DefaultApplicationTemp(self.id).dict # ) progress = col_tmp.find_one({"user": self.id, "type": "application"}) if progress is None: return stage = progress["stage"] # if self.sponsorship_state()[0] == "fill": # await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg)) # return if progress["state"] == "fill" and progress["sent"] is False: if msg.text is not None: msg.text = fix_text(str(msg.text)) if stage == 2: try: input_dt = datetime.strptime(query, "%d.%m.%Y") except ValueError: logWrite( f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format" ) await msg.reply_text( locale(f"question2_invalid", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage}", "force_reply", locale=self.locale, ) ) ), ) return if (datetime.now() <= input_dt) or ( (datetime.now() - input_dt).days ) > ( ( datetime.now() - datetime.now().replace( year=datetime.now().year - configGet("age_maximum") ) ).days ): logWrite( f"User {msg.from_user.id} failed stage {stage} due to joking" ) await msg.reply_text( locale("question2_joke", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale("question2", "force_reply", locale=self.locale) ) ), ) return elif ((datetime.now() - input_dt).days) < ( ( datetime.now() - datetime.now().replace( year=datetime.now().year - configGet("age_allowed") ) ).days ): logWrite( f"User {msg.from_user.id} failed stage {stage} due to being underage" ) await msg.reply_text( locale( "question2_underage", "message", locale=self.locale ).format(str(configGet("age_allowed"))), reply_markup=ForceReply( placeholder=str( locale("question2", "force_reply", locale=self.locale) ) ), ) return else: progress["application"][str(stage)] = input_dt col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "application"}}, { "$set": { "application": progress["application"], "stage": progress["stage"] + 1, } }, ) await msg.reply_text( locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage+1}", "force_reply", locale=self.locale, ) ) ), ) elif stage == 3: try: progress["application"][str(stage)] = find_location(query) if ( "lat" in progress["application"][str(stage)] and "lng" in progress["application"][str(stage)] ): progress["application"][str(stage)]["location"] = [ float(progress["application"][str(stage)]["lng"]), float(progress["application"][str(stage)]["lat"]), ] del progress["application"][str(stage)]["lat"] del progress["application"][str(stage)]["lng"] col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "application"}}, { "$set": { "application": progress["application"], "stage": progress["stage"] + 1, } }, ) await msg.reply_text( locale("question3_found", "message", locale=self.locale).format( progress["application"][str(stage)]["name"], progress["application"][str(stage)]["adminName1"], ) ) await msg.reply_text( locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage+1}", "force_reply", locale=self.locale, ) ) ), ) except PlaceNotFoundError: await msg.reply_text( locale("question3_invalid", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage}", "force_reply", locale=self.locale, ) ) ), ) return except Exception as exp: await msg.reply_text( locale("question3_error", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage}", "force_reply", locale=self.locale, ) ) ), ) try: await app.send_message( configGet("owner"), locale( "question3_traceback", "message", locale=self.locale ).format(query, exp, format_exc()), ) except bad_request_400.PeerIdInvalid: logWrite( f"Could not notify admin about failure when sending message! Admin has never interacted with bot!" ) return elif stage == 10: if len(query) > 1024: await msg.reply_text( locale("question10_too_long", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage}", "force_reply", locale=self.locale, ) ) ), ) return progress["application"][str(stage)] = query col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "application"}}, { "$set": { "application": progress["application"], "complete": True, } }, ) application_content = [] i = 1 for question in progress["application"]: if i == 2: age = relativedelta( datetime.now(), progress["application"]["2"] ) application_content.append( f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)" ) elif i == 3: if progress["application"]["3"]["countryCode"] == "UA": application_content.append( f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})" ) else: application_content.append( f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})" ) else: application_content.append( f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application'][question]}" ) i += 1 await msg.reply_text( locale("confirm", "message", locale=self.locale).format( "\n".join(application_content) ), reply_markup=ReplyKeyboardMarkup( locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True, ), ) else: if len(query) > 256: await msg.reply_text( locale("question_too_long", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage}", "force_reply", locale=self.locale, ) ) ), ) return progress["application"][str(stage)] = query col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "application"}}, { "$set": { "application": progress["application"], "stage": progress["stage"] + 1, } }, ) await msg.reply_text( locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"question{stage+1}", "force_reply", locale=self.locale ) ) ), ) logWrite(f"User {self.id} completed stage {stage} of application") else: return def sponsorship_state( self, ) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]: """Check the current state of sponsorship in tmp collection ### Returns: * `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether sponsorship application is complete. """ tmp_sponsorship = col_tmp.find_one({"user": self.id, "type": "sponsorship"}) if tmp_sponsorship is None: return "none", False else: return tmp_sponsorship["state"], tmp_sponsorship["complete"] def sponsorship_valid(self) -> bool: """Check whether user has a valid sponsorship ### Returns: * `bool`: `True` if yes and `False` if no """ return ( True if col_sponsorships.find_one( {"user": self.id, "expires": {"$gt": datetime.now()}} ) is not None else False ) def sponsorship_restart(self) -> None: """Reset sponsorship of a user in tmp collection and replace it with an empty one""" if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is None: col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict) else: col_tmp.delete_one({"user": self.id, "type": "sponsorship"}) col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict) async def sponsorship_next( self, query: str, msg: Message, photo: Union[Photo, None] = None ) -> None: """Move on filling sponsorship of user ### Args: * query (`str`): Some kind of input * msg (`Message`): Message that should receive replies """ progress = col_tmp.find_one({"user": self.id, "type": "sponsorship"}) if progress is not None: stage = progress["stage"] if msg.text is not None: msg.text = fix_text(str(msg.text)) elif msg.caption is not None: msg.caption = fix_text(msg.caption) if progress["state"] == "fill" and progress["sent"] is False: if stage == 1: if len(query) > 240: logWrite( f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format" ) await msg.reply_text( locale(f"sponsor1_invalid", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"sponsor{stage}", "force_reply", locale=self.locale, ) ) ), ) return progress["sponsorship"]["streamer"] = query col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, { "$set": { "sponsorship": progress["sponsorship"], "stage": progress["stage"] + 1, } }, ) await msg.reply_text( locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"sponsor{stage+1}", "force_reply", locale=self.locale, ) ) ), ) elif stage == 2: try: input_dt = datetime.strptime(query, "%d.%m.%Y") except ValueError: logWrite( f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format" ) await msg.reply_text( locale(f"sponsor2_invalid", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"sponsor{stage}", "force_reply", locale=self.locale, ) ) ), ) return if datetime.now() >= input_dt: logWrite( f"User {msg.from_user.id} failed stage {stage} due to sending date in the past" ) await msg.reply_text( locale("sponsor2_past", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( "sponsor2", "force_reply", locale=self.locale ) ) ), ) return else: progress["sponsorship"]["expires"] = input_dt col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, { "$set": { "sponsorship": progress["sponsorship"], "stage": progress["stage"] + 1, } }, ) await msg.reply_text( locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"sponsor{stage+1}", "force_reply", locale=self.locale, ) ) ), ) elif stage == 3: if photo is not None: progress["sponsorship"]["proof"] = photo.file_id col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, { "$set": { "sponsorship": progress["sponsorship"], "stage": progress["stage"] + 1, } }, ) await msg.reply_text( locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply( placeholder=str( locale( f"sponsor{stage+1}", "force_reply", locale=self.locale, ) ) ), ) elif stage == 4: if len(query) > 16: await msg.reply_text( locale("label_too_long", "message"), reply_markup=ForceReply( placeholder=str( locale( "sponsor4", "force_reply", locale=self.locale ) ) ), ) return progress["sponsorship"]["label"] = query col_tmp.update_one( {"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, { "$set": { "sponsorship": progress["sponsorship"], "complete": True, } }, ) await msg.reply_cached_media( progress["sponsorship"]["proof"], caption=locale( "sponsor_confirm", "message", locale=self.locale ).format( progress["sponsorship"]["streamer"], progress["sponsorship"]["expires"].strftime("%d.%m.%Y"), progress["sponsorship"]["label"], ), reply_markup=ReplyKeyboardMarkup( locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True, ), ) else: return logWrite(f"User {self.id} completed stage {stage} of sponsorship") else: return def spoiler_state(self) -> bool: """Check if user has any started but not finished spoilers ### Returns: * `bool`: `True` if any not finished spoilers available and `False` if none. """ return ( False if col_spoilers.find_one({"user": self.id, "completed": False}) is None else True )