import asyncio from time import time from os import getpid, listdir, remove from modules.utils import * from pyrogram.client import Client from pyrogram import filters from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, BotCommand, BotCommandScopeChat, ReplyKeyboardMarkup, ForceReply, ReplyKeyboardRemove from pyrogram import idle # type: ignore from pyrogram.errors.exceptions import bad_request_400 pid = getpid() app = Client("holochecker", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot")) @app.on_message(~ filters.scheduled & filters.command(["start"], prefixes=["/"])) async def cmd_start(app, msg): try: user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage != 0: return except FileNotFoundError: jsonSave(jsonLoad(f"data{sep}user_default.json"), f"data{sep}users{sep}{msg.from_user.id}.json") user_stage = configGet("stage", file=str(msg.from_user.id)) configSet("telegram_id", str(msg.from_user.username), file=str(msg.from_user.id)) configSet("telegram_name", f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id)) configSet("telegram_phone", str(msg.from_user.phone_number), file=str(msg.from_user.id)) configSet("telegram_locale", str(msg.from_user.language_code), file=str(msg.from_user.id)) await msg.reply_text(locale("start", "message"), reply_markup=ReplyKeyboardMarkup(locale("welcome", "keyboards"), resize_keyboard=True)) @app.on_message(~ filters.scheduled & filters.command(["kill", "die", "reboot"], prefixes=["", "/"])) async def cmd_kill(app, msg): if (msg.from_user.id == configGet("owner")) or (msg.from_user.id in configGet("admins")): logWrite(f"Shutting down bot with pid {pid}") await msg.reply_text(f"Вимкнення бота з підом `{pid}`") killProc(pid) @app.on_message(~ filters.scheduled & (filters.regex(locale("welcome", "keyboards")[0][0]) | filters.regex(locale("return", "keyboards")[0][0]))) async def welcome_pass(app, msg, once_again: bool = True): if not once_again: await msg.reply_text(locale("privacy_notice", "message")) await msg.reply_text(locale("question1", "message"), reply_markup=ForceReply(placeholder=locale("question1", "force_reply"))) configSet("stage", 1, file=str(msg.from_user.id)) @app.on_message(~ filters.scheduled & (filters.regex(locale("welcome", "keyboards")[1][0]))) async def welcome_reject(app, msg): await msg.reply_text(locale("goodbye", "message"), reply_markup=ReplyKeyboardMarkup(locale("return", "keyboards"), resize_keyboard=True)) @app.on_message(~ filters.scheduled & (filters.regex(locale("confirm", "keyboards")[0][0]))) async def confirm_yes(app, msg): user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage == 10: await msg.reply_text("Alright! You did it!", reply_markup=ReplyKeyboardRemove()) # send to admins configSet("sent", True, file=str(msg.from_user.id)) configSet("application_date", int(time()), file=str(msg.from_user.id)) @app.on_message(~ filters.scheduled & (filters.regex(locale("confirm", "keyboards")[1][0]))) async def confirm_no(app, msg): user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage == 10: jsonSave(jsonLoad(f"data{sep}user_default.json"), f"data{sep}users{sep}{msg.from_user.id}.json") configSet("telegram_id", str(msg.from_user.username), file=str(msg.from_user.id)) configSet("telegram_name", f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id)) configSet("telegram_phone", str(msg.from_user.phone_number), file=str(msg.from_user.id)) configSet("telegram_locale", str(msg.from_user.language_code), file=str(msg.from_user.id)) await welcome_pass(app, msg, once_again=True) @app.on_message(~ filters.scheduled) async def any_stage(app, msg): user_stage = configGet("stage", file=str(msg.from_user.id)) if user_stage == 1: await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage+1}", "force_reply"))) configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id)) configSet("stage", user_stage+1, file=str(msg.from_user.id)) elif user_stage == 2: try: configSet(str(user_stage), int(msg.text), "application", file=str(msg.from_user.id)) await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage+1}", "force_reply"))) configSet("stage", user_stage+1, file=str(msg.from_user.id)) except ValueError: await msg.reply_text(locale(f"question2_invalid", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage}", "force_reply"))) else: if user_stage <= 9: await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage+1}", "force_reply"))) configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id)) configSet("stage", user_stage+1, file=str(msg.from_user.id)) else: if not configGet("sent", file=str(msg.from_user.id)): if not configGet("confirmed", file=str(msg.from_user.id)): await msg.reply_text(locale("confirm", "message"), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboards"), resize_keyboard=True)) configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id)) #configSet("sent", True, file=str(msg.from_user.id)) #configSet("application_date", int(time()), file=str(msg.from_user.id)) else: await msg.reply_text("You are already done, wait!") else: await msg.reply_text("You are already done, wait!") if __name__ == "__main__": logWrite(f"Starting up with pid {pid}") # Yes, it should be in some kind of async main() function but I don't give a shit. # I did compare performance, almost no difference and it's much more useful this way. Change my mind. app.start() # type: ignore app.send_message(configGet("owner"), f"Starting up with pid `{pid}`") # type: ignore # Registering user commands for fallback locale commands_list = [] for command in configGet("commands"): commands_list.append(BotCommand(command, configGet("commands")[command])) app.set_bot_commands(commands_list) # type: ignore # Registering admin commands commands_admin_list = [] for command in configGet("commands"): commands_admin_list.append(BotCommand(command, configGet("commands")[command])) for command in configGet("commands_admin"): commands_admin_list.append(BotCommand(command, configGet("commands_admin")[command])) for admin in configGet("admins"): try: app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) # type: ignore except bad_request_400.PeerIdInvalid: pass app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) # type: ignore idle() app.send_message(configGet("owner"), f"Shutting with pid `{pid}`") # type: ignore app.stop() # type: ignore killProc(pid)