from datetime import datetime from requests import get from traceback import print_exc from app import app, isAnAdmin from typing import Any, List, Literal, Union from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup from pyrogram.errors import bad_request_400 from dateutil.relativedelta import relativedelta from modules.database import col_tmp, col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages from modules.logging import logWrite from modules.utils import configGet, locale, should_quote class DefaultApplicationTemp(dict): def __init__(self, user: int): super().__init__({}) self.dict = { "user": user, "type": "application", "complete": False, "sent": False, "state": "fill", "reapply": False, "stage": 1, "application": { "1": None, "2": None, "3": None, "4": None, "5": None, "6": None, "7": None, "8": None, "9": None, "10": None } } class UserNotFoundError(Exception): """HoloUser could not find user with such an ID in database""" def __init__(self, user, user_id): self.user = user self.user_id = user_id super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found") class UserInvalidError(Exception): """Provided to HoloUser object is not supported""" def __init__(self, user): self.user = user super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type") class HoloUser(): """This object represents a user of HoloChecker bot. It is primarily used to interact with a database in a more python-friendly way, as well as provide better programming experience in case of adding new features, etc. """ def __init__(self, user: Union[User, List[User], ChatMember, int, str]) -> None: """A user of Holo bot. Used to simplify DB interaction. ### Args: * user (`Union[User, List[User], ChatMember, int, str]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, username or something... ### Raises: * UserInvalidError: Provided to `HoloUser` object is not supported * UserNotFoundError: `HoloUser` could not find user with such an ID in database """ # Determine input object class and extract id if isinstance(user, list) and len(user) != 0: self.id = user[0].id elif isinstance(user, User): self.id = user.id elif isinstance(user, ChatMember): self.id = user.user.id elif isinstance(user, int): self.id = user elif isinstance(user, str): try: self.id = (app.get_users(user)).id # this line requires testing though except bad_request_400.UsernameNotOccupied: raise UserInvalidError(user) except bad_request_400.PeerIdInvalid: raise UserInvalidError(user) else: raise UserInvalidError(user) # Find user record in DB holo_user = col_users.find_one({"user": self.id}) if holo_user is None: raise UserNotFoundError(user=user, user_id=self.id) self.db_id = holo_user["_id"] self.link = holo_user["link"] self.label = holo_user["label"] self.name = holo_user["tg_name"] self.phone = holo_user["tg_phone"] self.locale = holo_user["tg_locale"] self.username = holo_user["tg_username"] def set(self, key: str, value: Any) -> None: """Set attribute data and save it into database ### Args: * `key` (`str`): Attribute to be changed * `value` (`Any`): Value to set """ if not hasattr(self, key): raise AttributeError() setattr(self, key, value) col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True) logWrite(f"Set attribute {key} of user {self.id} to {value}") async def message(self, context: Message, origin: Union[Message, None] = None, text: Union[str, None] = None, caption: Union[str, None] = None, photo: Union[str, Photo, None] = None, video: Union[str, Video, None] = None, file: Union[str, Document, None] = None, animation: Union[str, Animation, None] = None, voice: Union[str, Voice, None] = None, adm_origin: bool = False, adm_context: bool = False ) -> None: """Send a message to user ### Args: * context (`Message`): Context (mostly the message where this method is called) * origin (`Union[Message, None]`, *optional*): Origin message where to refer. None if called in a command. Defaults to None. * text (`Union[str, None]`, *optional*): Text if this is a simple text message. Defaults to None. * caption (`Union[str, None]`, *optional*): Text if this is a media message. Defaults to None. * photo (`Union[str, Photo, None]`, *optional*): Photo as a photo object or file_id as a string. Defaults to None. * video (`Union[str, Video, None]`, *optional*): Video as a video object or file_id as a string. Defaults to None. * file (`Union[str, Document, None]`, *optional*): File as a document object or file_id as a string. Defaults to None. * animation (`Union[str, Animation, None]`, *optional*): Animation as an animation object or file_id as a string. Defaults to None. * voice (`Union[str, Voice, None]`, *optional*): Voice as a voice object or file_id as a string. Defaults to None. * adm_origin (`bool`, *optional*): Whether origin sender is an admin. Defaults to False. * adm_context (`bool`, *optional*): Whether context sender is an admin. Defaults to False. """ # Check if any text available and log message sending if text is not None: logWrite(f"{context.from_user.id} sent message '{text}' to {self.id}") elif caption is not None: logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id}") else: logWrite(f"{context.from_user.id} sent message to {self.id}") # Add notices for admin or user if text is not None: if adm_context: text += locale("message_reply_notice", "message") elif adm_origin: text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message") else: text = locale("message_reply_notice", "message") if caption is not None: if adm_context: caption += locale("message_reply_notice", "message") elif adm_origin: caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message") else: caption = locale("message_reply_notice", "message") # Try sending the message try: # Check if origin message exists # This check decides whether we send_ a message or reply_ to one if origin is not None: if photo is not None: if isinstance(photo, Photo): photo = photo.file_id new_message = await origin.reply_photo(photo, caption=caption, quote=True) elif video is not None: if isinstance(video, Video): video = video.file_id new_message = await origin.reply_video(video, caption=caption, quote=True) elif file is not None: if isinstance(file, Document): file = file.file_id new_message = await origin.reply_document(file, caption=caption, quote=True) elif animation is not None: if isinstance(animation, Animation): animation = animation.file_id new_message = await origin.reply_animation(animation, caption=caption, quote=True) elif voice is not None: if isinstance(voice, Voice): voice = voice.file_id new_message = await origin.reply_voice(voice, caption=caption, quote=True) else: new_message = await origin.reply_text(text, quote=True) else: if photo is not None: if isinstance(photo, Photo): photo = photo.file_id new_message = await app.send_photo(self.id, photo, caption=caption) elif video is not None: if isinstance(video, Video): video = video.file_id new_message = await app.send_video(self.id, video, caption=caption) elif file is not None: if isinstance(file, Document): file = file.file_id new_message = await app.send_document(self.id, file, caption=caption) elif animation is not None: if isinstance(animation, Animation): animation = animation.file_id new_message = await app.send_animation(animation, caption=caption, quote=True) elif voice is not None: if isinstance(voice, Voice): voice = voice.file_id new_message = await app.send_voice(voice, caption=caption, quote=True) else: new_message = await app.send_message(self.id, text) # Acknowledge sending a message and save entry into DB await context.reply_text(locale("message_sent", "message"), quote=should_quote(context)) col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}}) # Report to admin and to sender about message sending failure except Exception as exp: logWrite(f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{print_exc()}") try: await app.send_message(configGet("owner"), locale("message_traceback", "message").format(context.from_user.id, self.id, exp, print_exc())) except bad_request_400.PeerIdInvalid: logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!") await context.reply_text(locale("message_error", "message"), quote=should_quote(context)) async def set_label(self, chat: Chat, label: str) -> None: """Set label in destination group ### Args: * chat (`Chat`): Telegram chat * label (`str`): Label you want to set """ self.label = label self.set("label", label) await app.promote_chat_member(configGet("destination_group"), self.id) if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")): await app.set_administrator_title(configGet("destination_group"), self.id, label) async def reset_label(self, chat: Chat) -> None: """Reset label in destination group ### Args: * chat (`Chat`): Telegram chat """ self.label = "" self.set("label", "") await app.set_administrator_title(configGet("destination_group"), self.id, "") if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")): await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges( can_manage_chat=False )) def application_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]: """Check the current state of application in tmp collection ### Returns: * `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether application is complete. """ tmp_application = col_tmp.find_one({"user": self.id, "type": "application"}) if tmp_application is None: return "none", False else: return tmp_application["state"], tmp_application["complete"] def application_restart(self) -> None: """Reset application of a user in tmp collection and replace it with an empty one """ if col_tmp.find_one({"user": self.id, "type": "application"}) is None: col_tmp.insert_one(document=DefaultApplicationTemp(self.id).dict) else: col_tmp.delete_one({"user": self.id, "type": "application"}) col_tmp.insert_one(document=DefaultApplicationTemp(self.id).dict) async def application_next(self, query: str, msg: Message) -> None: """Move on filling application of user ### Args: * query (`str`): Some kind of input * msg (`Message`): Message that should receive replies """ if col_tmp.find_one({"user": self.id, "type": "application"}) is None: col_tmp.insert_one( document=DefaultApplicationTemp(self.id).dict ) progress = col_tmp.find_one({"user": self.id, "type": "application"}) stage = progress["stage"] if progress["state"] == "fill": if stage == 2: try: input_dt = datetime.strptime(query, "%d.%m.%Y") except ValueError: logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format") await msg.reply_text(locale(f"question2_invalid", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply")))) return if datetime.now() <= input_dt: logWrite(f"User {msg.from_user.id} failed stage {stage} due to joking") await msg.reply_text(locale("question2_joke", "message"), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply")))) return elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days): logWrite(f"User {msg.from_user.id} failed stage {stage} due to being underage") await msg.reply_text(locale("question2_underage", "message").format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply")))) return else: print(f'Look: {((datetime.now() - input_dt).days)} > {(datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days}') progress["application"][str(stage)] = input_dt col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"question{stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply")))) elif stage == 3: try: result = (get(f"http://api.geonames.org/searchJSON?q={query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json() progress["application"][str(stage)] = result["geonames"][0] col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"question3_found", "message").format(result["geonames"][0]["name"], result["geonames"][0]["adminName1"])) await msg.reply_text(locale(f"question{stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply")))) except (ValueError, KeyError, IndexError): await msg.reply_text(locale(f"question3_invalid", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply")))) return except Exception as exp: await msg.reply_text(locale("question3_error", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply")))) try: await app.send_message(configGet("owner"), locale("question3_traceback", "message").format(query, exp, print_exc())) except bad_request_400.PeerIdInvalid: logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!") return elif stage == 10: progress["application"][str(stage)] = query col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "complete": True}}) application_content = [] i = 1 for question in progress["application"]: if i == 2: age = relativedelta(datetime.now(), progress['application']['2']) application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)") elif i == 3: if progress['application']['3']['countryCode'] == "UA": application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})") else: application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})") else: application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {progress['application'][question]}") i += 1 await msg.reply_text(locale("confirm", "message").format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard"), resize_keyboard=True)) else: progress["application"][str(stage)] = query col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}}) await msg.reply_text(locale(f"question{stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply")))) logWrite(f"User {self.id} completed stage {stage} of application")