from datetime import datetime, timedelta from time import time from os import getpid, path from modules.utils import * from pyrogram.client import Client from pyrogram import filters from pyrogram.enums.parse_mode import ParseMode from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, BotCommand, BotCommandScopeChat, ReplyKeyboardMarkup, ForceReply, ReplyKeyboardRemove from pyrogram import idle # type: ignore from pyrogram.errors.exceptions import bad_request_400 pid = getpid() app = Client("holochecker", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot")) # Start command ================================================================================================================ @app.on_message(~ filters.scheduled & filters.private & filters.command(["start"], prefixes=["/"])) async def cmd_start(app, msg): try: user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage != 0: return except FileNotFoundError: jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") user_stage = configGet("stage", file=str(msg.from_user.id)) configSet("telegram_id", str(msg.from_user.username), file=str(msg.from_user.id)) configSet("telegram_name", f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id)) configSet("telegram_phone", str(msg.from_user.phone_number), file=str(msg.from_user.id)) configSet("telegram_locale", str(msg.from_user.language_code), file=str(msg.from_user.id)) logWrite(f"User {msg.from_user.id} started bot interaction") await msg.reply_text(locale("start", "message"), reply_markup=ReplyKeyboardMarkup(locale("welcome", "keyboard"), resize_keyboard=True)) # type: ignore # ============================================================================================================================== # Shutdown command ============================================================================================================= @app.on_message(~ filters.scheduled & filters.private & filters.command(["kill", "die", "reboot"], prefixes=["", "/"])) async def cmd_kill(app, msg): if (msg.from_user.id == configGet("owner")) or (msg.from_user.id in configGet("admins")): logWrite(f"Shutting down bot with pid {pid}") await msg.reply_text(f"Вимкнення бота з підом `{pid}`") killProc(pid) # ============================================================================================================================== # Welcome check ================================================================================================================ @app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[0][0]) | filters.regex(locale("return", "keyboard")[0][0]))) async def welcome_pass(app, msg, once_again: bool = True): if not once_again: await msg.reply_text(locale("privacy_notice", "message")) logWrite(f"User {msg.from_user.id} confirmed starting the application") await msg.reply_text(locale("question1", "message"), reply_markup=ForceReply(placeholder=locale("question1", "force_reply"))) # type: ignore configSet("stage", 1, file=str(msg.from_user.id)) @app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[1][0]))) async def welcome_reject(app, msg): logWrite(f"User {msg.from_user.id} refused to start the application") await msg.reply_text(locale("goodbye", "message"), reply_markup=ReplyKeyboardMarkup(locale("return", "keyboard"), resize_keyboard=True)) # type: ignore # ============================================================================================================================== # Confirmation ================================================================================================================= @app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[0][0]))) async def confirm_yes(app, msg): user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage == 10: if not configGet("sent", file=str(msg.from_user.id)): await msg.reply_text(locale("application_sent", "message"), reply_markup=ReplyKeyboardRemove()) applications = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json") applications[str(msg.from_user.id)] = { "approved": False, "approved_by": None, "approval_date": None, "refused": False, "refused_by": False, "refusal_date": False, "application_date": int(time()), "application": configGet("application", file=str(msg.from_user.id)) } jsonSave(applications, f"{configGet('data', 'locations')}{sep}applications.json") application_content = [] i = 1 for question in configGet("application", file=str(msg.from_user.id)): application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}") i += 1 await app.send_message(chat_id=configGet("admin_group"), text=(locale("application_got", "message")).format(str(msg.from_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), parse_mode=ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup( # type: ignore [ [ InlineKeyboardButton(text=str(locale("sub_yes", "button")), callback_data=f"sub_yes_{msg.from_user.id}") ], [ InlineKeyboardButton(text=str(locale("sub_no", "button")), callback_data=f"sub_no_{msg.from_user.id}") ], [ InlineKeyboardButton(text=str(locale("sub_no_aggressive", "button")), callback_data=f"sub_no_aggresive_{msg.from_user.id}") ], [ InlineKeyboardButton(text=str(locale("sub_no_russian", "button")), callback_data=f"sub_no_russian_{msg.from_user.id}") ] ] ) ) logWrite(f"User {msg.from_user.id} sent his application and it will now be reviewed") configSet("sent", True, file=str(msg.from_user.id)) configSet("confirmed", True, file=str(msg.from_user.id)) @app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[1][0]))) async def confirm_no(app, msg): user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage == 10: jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") configSet("telegram_id", str(msg.from_user.username), file=str(msg.from_user.id)) configSet("telegram_name", f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id)) configSet("telegram_phone", str(msg.from_user.phone_number), file=str(msg.from_user.id)) configSet("telegram_locale", str(msg.from_user.language_code), file=str(msg.from_user.id)) await welcome_pass(app, msg, once_again=True) logWrite(f"User {msg.from_user.id} restarted the application due to typo in it") # ============================================================================================================================== # Callbacks ==================================================================================================================== @app.on_callback_query(filters.regex("sub_yes_[\s\S]*")) # type: ignore async def callback_query_accept(app, clb): fullclb = clb.data.split("_") await app.send_message(configGet("admin_group"), locale("approved_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore logWrite(f"User {fullclb[2]} got approved by {clb.from_user.id}") link = await app.create_chat_invite_link(configGet("destination_group"), name=f"Invite for {fullclb[2]}", member_limit=1, expire_date=datetime.now()+timedelta(days=1)) await app.send_message(int(fullclb[2]), locale("approved", "message"), reply_markup=InlineKeyboardMarkup( [[ InlineKeyboardButton(str(locale("join", "button")), url=link.invite_link) ]] )) configSet("approved", True, file=fullclb[2]) configSet("link", link.invite_link, file=fullclb[2]) logWrite(f"User {fullclb[2]} got an invite link {link.invite_link}") application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json") application[fullclb[2]]["approved"] = True application[fullclb[2]]["approved_by"] = clb.from_user.id application[fullclb[2]]["approval_date"] = int(time()) jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json") edited_markup = [[InlineKeyboardButton(text=str(locale("accepted", "button")), callback_data="nothing")]] await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup)) await clb.answer(text=locale("sub_accepted", "callback").format(fullclb[2]), show_alert=True) # type: ignore @app.on_callback_query(filters.regex("sub_no_aggressive_[\s\S]*")) # type: ignore async def callback_query_refuse_aggressive(app, clb): fullclb = clb.data.split("_") await app.send_message(configGet("admin_group"), locale("refused_by_agr", "message").format(clb.from_user.first_name, fullclb[3]), disable_notification=True) # type: ignore await app.send_message(int(fullclb[3]), locale("refused", "message")) logWrite(f"User {fullclb[3]} got refused by {clb.from_user.id} due to being aggressive") configSet("refused", True, file=fullclb[3]) application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json") application[fullclb[3]]["refused"] = True application[fullclb[3]]["refused_by"] = clb.from_user.id application[fullclb[3]]["refusal_date"] = int(time()) jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json") edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]] await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup)) await clb.answer(text=locale("sub_no_aggressive", "callback").format(fullclb[3]), show_alert=True) # type: ignore @app.on_callback_query(filters.regex("sub_no_russian_[\s\S]*")) # type: ignore async def callback_query_refuse_russian(app, clb): fullclb = clb.data.split("_") await app.send_message(configGet("admin_group"), locale("refused_by_rus", "message").format(clb.from_user.first_name, fullclb[3]), disable_notification=True) # type: ignore await app.send_message(int(fullclb[3]), locale("refused", "message")) await app.send_message(int(fullclb[3]), locale("refused_russian", "message")) logWrite(f"User {fullclb[3]} got refused by {clb.from_user.id} due to being russian") configSet("refused", True, file=fullclb[3]) application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json") application[fullclb[3]]["refused"] = True application[fullclb[3]]["refused_by"] = clb.from_user.id application[fullclb[3]]["refusal_date"] = int(time()) jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json") edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]] await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup)) await clb.answer(text=locale("sub_no_russian", "callback").format(fullclb[3]), show_alert=True) # type: ignore @app.on_callback_query(filters.regex("sub_no_[\s\S]*")) # type: ignore async def callback_query_refuse(app, clb): fullclb = clb.data.split("_") await app.send_message(configGet("admin_group"), locale("refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore await app.send_message(int(fullclb[2]), locale("refused", "message")) logWrite(f"User {fullclb[2]} got refused by {clb.from_user.id}") configSet("refused", True, file=fullclb[2]) application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json") application[fullclb[2]]["refused"] = True application[fullclb[2]]["refused_by"] = clb.from_user.id application[fullclb[2]]["refusal_date"] = int(time()) jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json") edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]] await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup)) await clb.answer(text=locale("sub_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore # ============================================================================================================================== # Contact getting ============================================================================================================== @app.on_message(~ filters.scheduled & filters.contact & filters.private) async def get_contact(app, msg): if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")["approved"]) or (msg.from_user.id in configGet("admins")) or (msg.from_user.id == configGet("owner")): if msg.contact.user_id != None: try: user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.contact.user_id}.json") application_content = [] i = 1 for question in configGet("application", file=str(msg.contact.user_id)): application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.contact.user_id))[question]}") i += 1 if user_data["sent"]: application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(msg.contact.user_id)] if user_data["approved"]: application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore elif application["refused"]: application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore else: application_status = locale("application_status_on_hold", "message") else: application_status = locale("application_status_not_send", "message") logWrite(f"User {msg.from_user.id} requested application of {msg.contact.user_id}") await msg.reply_text(locale("contact", "message").format(str(msg.contact.user_id), "\n".join(application_content), application_status)) # type: ignore except FileNotFoundError: logWrite(f"User {msg.from_user.id} requested application of {msg.contact.user_id} but user does not exists") await msg.reply_text(locale("contact_invalid", "message")) else: logWrite(f"User {msg.from_user.id} requested application of someone but user is not telegram user") await msg.reply_text(locale("contact_not_member", "message")) # ============================================================================================================================== # Any other input ============================================================================================================== @app.on_message(~ filters.scheduled & filters.private) async def any_stage(app, msg): user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage == 1: await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply")))) logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application") configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id)) configSet("stage", user_stage+1, file=str(msg.from_user.id)) elif user_stage == 2: try: configSet(str(user_stage), int(msg.text), "application", file=str(msg.from_user.id)) if (int(msg.text) in [-1, 0, 128, 256, 512, 1024, 2048]) or (int(msg.text) >= 100): logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to joking") await msg.reply_text(locale("question2_joke", "message"), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply")))) elif int(msg.text) < configGet("age_allowed"): logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to being underage") await msg.reply_text(locale("question2_underage", "message").format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply")))) # type: ignore else: logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application") await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply")))) configSet("stage", user_stage+1, file=str(msg.from_user.id)) except ValueError: logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to sending not int") await msg.reply_text(locale(f"question2_invalid", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage}", "force_reply")))) else: if user_stage <= 9: logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application") await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply")))) configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id)) configSet("stage", user_stage+1, file=str(msg.from_user.id)) else: if not configGet("sent", file=str(msg.from_user.id)): if not configGet("confirmed", file=str(msg.from_user.id)): application_content = [] i = 1 for question in configGet("application", file=str(msg.from_user.id)): application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}") i += 1 await msg.reply_text(locale("confirm", "message").format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard"), resize_keyboard=True)) # type: ignore configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id)) #configSet("sent", True, file=str(msg.from_user.id)) #configSet("application_date", int(time()), file=str(msg.from_user.id)) else: await msg.reply_text(locale("already_sent", "message")) else: if not configGet("approved", file=str(msg.from_user.id)) and not configGet("refused", file=str(msg.from_user.id)): await msg.reply_text(locale("already_sent", "message")) # ============================================================================================================================== if __name__ == "__main__": logWrite(f"Starting up with pid {pid}") # Yes, it should be in some kind of async main() function but I don't give a shit. # I did compare performance, almost no difference and it's much more useful this way. Change my mind. app.start() # type: ignore app.send_message(configGet("owner"), f"Starting up with pid `{pid}`") # type: ignore # # Registering user commands # commands_list = [] # for command in configGet("commands"): # commands_list.append(BotCommand(command, configGet("commands")[command])) # app.set_bot_commands(commands_list) # type: ignore # Registering admin commands commands_admin_list = [] # for command in configGet("commands"): # commands_admin_list.append(BotCommand(command, configGet("commands")[command])) for command in configGet("commands_admin"): commands_admin_list.append(BotCommand(command, configGet("commands_admin")[command])) for admin in configGet("admins"): try: app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) # type: ignore except bad_request_400.PeerIdInvalid: pass app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) # type: ignore idle() app.send_message(configGet("owner"), f"Shutting with pid `{pid}`") # type: ignore app.stop() # type: ignore killProc(pid)