This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/main.py
2022-10-21 14:00:03 +02:00

274 lines
14 KiB
Python

from time import time
from os import getpid, listdir, remove
from modules.utils import *
from pyrogram.client import Client
from pyrogram import filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, BotCommand, BotCommandScopeChat, ReplyKeyboardMarkup, ForceReply, ReplyKeyboardRemove
from pyrogram import idle # type: ignore
from pyrogram.errors.exceptions import bad_request_400
pid = getpid()
app = Client("holochecker", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot"))
# Start command ================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["start"], prefixes=["/"]))
async def cmd_start(app, msg):
try:
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage != 0:
return
except FileNotFoundError:
jsonSave(jsonLoad(f"data{sep}user_default.json"), f"data{sep}users{sep}{msg.from_user.id}.json")
user_stage = configGet("stage", file=str(msg.from_user.id))
configSet("telegram_id", str(msg.from_user.username), file=str(msg.from_user.id))
configSet("telegram_name", f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id))
configSet("telegram_phone", str(msg.from_user.phone_number), file=str(msg.from_user.id))
configSet("telegram_locale", str(msg.from_user.language_code), file=str(msg.from_user.id))
await msg.reply_text(locale("start", "message"), reply_markup=ReplyKeyboardMarkup(locale("welcome", "keyboard"), resize_keyboard=True))
# ==============================================================================================================================
# Shutdown command =============================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["kill", "die", "reboot"], prefixes=["", "/"]))
async def cmd_kill(app, msg):
if (msg.from_user.id == configGet("owner")) or (msg.from_user.id in configGet("admins")):
logWrite(f"Shutting down bot with pid {pid}")
await msg.reply_text(f"Вимкнення бота з підом `{pid}`")
killProc(pid)
# ==============================================================================================================================
# Welcome check ================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[0][0]) | filters.regex(locale("return", "keyboard")[0][0])))
async def welcome_pass(app, msg, once_again: bool = True):
if not once_again:
await msg.reply_text(locale("privacy_notice", "message"))
await msg.reply_text(locale("question1", "message"), reply_markup=ForceReply(placeholder=locale("question1", "force_reply")))
configSet("stage", 1, file=str(msg.from_user.id))
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[1][0])))
async def welcome_reject(app, msg):
await msg.reply_text(locale("goodbye", "message"), reply_markup=ReplyKeyboardMarkup(locale("return", "keyboard"), resize_keyboard=True))
# ==============================================================================================================================
# Confirmation =================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[0][0])))
async def confirm_yes(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 10:
await msg.reply_text(locale("application_sent", "message"), reply_markup=ReplyKeyboardRemove())
applications = jsonLoad(f"data{sep}applications.json")
applications[str(msg.from_user.id)] = {
"approved": False,
"approved_by": None,
"approval_date": None,
"refused": False,
"refused_by": False,
"refusal_date": False,
"application_date": int(time()),
"application": configGet("application", file=str(msg.from_user.id))
}
jsonSave(applications, f"data{sep}applications.json")
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}")
i += 1
await app.send_message(chat_id=configGet("admin_group"), text=(locale("application_got", "message")).format(str(msg.from_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(text=locale("sub_yes", "button"), callback_data=f"sub_yes_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=locale("sub_no", "button"), callback_data=f"sub_no_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=locale("sub_no_aggresive", "button"), callback_data=f"sub_no_aggresive_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=locale("sub_no_russian", "button"), callback_data=f"sub_no_russian_{msg.from_user.id}")
]
]
)
)
configSet("sent", True, file=str(msg.from_user.id))
configSet("application_date", int(time()), file=str(msg.from_user.id))
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[1][0])))
async def confirm_no(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 10:
jsonSave(jsonLoad(f"data{sep}user_default.json"), f"data{sep}users{sep}{msg.from_user.id}.json")
configSet("telegram_id", str(msg.from_user.username), file=str(msg.from_user.id))
configSet("telegram_name", f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id))
configSet("telegram_phone", str(msg.from_user.phone_number), file=str(msg.from_user.id))
configSet("telegram_locale", str(msg.from_user.language_code), file=str(msg.from_user.id))
await welcome_pass(app, msg, once_again=True)
# ==============================================================================================================================
# Callbacks ====================================================================================================================
@app.on_callback_query(filters.regex("sub_yes_[\s\S]*"))
async def callback_query_accept(app, clb):
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
await app.send_message(configGet("admin_group"), f"Approved by {clb.from_user.id}")
await app.send_message(int(fullclb[2]), "You got approved")
edited_markup = [[InlineKeyboardButton(text=locale("accepted", "button"), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_block", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
@app.on_callback_query(filters.regex("sub_no_[\s\S]*"))
async def callback_query_refuse(app, clb):
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
await app.send_message(configGet("admin_group"), f"Refused by {clb.from_user.id}")
await app.send_message(int(fullclb[2]), "You got refused")
edited_markup = [[InlineKeyboardButton(text=locale("declined", "button"), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_block", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
@app.on_callback_query(filters.regex("sub_no_aggresive_[\s\S]*"))
async def callback_query_refuse(app, clb):
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
await app.send_message(configGet("admin_group"), f"Refused by {clb.from_user.id}")
await app.send_message(int(fullclb[2]), "You got refused")
edited_markup = [[InlineKeyboardButton(text=locale("declined", "button"), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_block", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
@app.on_callback_query(filters.regex("sub_no_russian_[\s\S]*"))
async def callback_query_refuse(app, clb):
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
await app.send_message(configGet("admin_group"), f"Refused by {clb.from_user.id}")
await app.send_message(int(fullclb[2]), "You got refused")
edited_markup = [[InlineKeyboardButton(text=locale("declined", "button"), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_block", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
# ==============================================================================================================================
# Any other input ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.private)
async def any_stage(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 1:
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage+1}", "force_reply")))
configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id))
configSet("stage", user_stage+1, file=str(msg.from_user.id))
elif user_stage == 2:
try:
configSet(str(user_stage), int(msg.text), "application", file=str(msg.from_user.id))
if (int(msg.text) in [-1, 0, 128, 256, 512, 1024, 2048]) or (int(msg.text) >= 100):
await msg.reply_text(locale("question2_joke", "message"), reply_markup=ForceReply(placeholder=locale("question2", "force_reply")))
elif int(msg.text) < configGet("age_allowed"):
await msg.reply_text(locale("question2_underage", "message"), reply_markup=ForceReply(placeholder=locale("question2", "force_reply")))
else:
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage+1}", "force_reply")))
configSet("stage", user_stage+1, file=str(msg.from_user.id))
except ValueError:
await msg.reply_text(locale(f"question2_invalid", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage}", "force_reply")))
else:
if user_stage <= 9:
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=locale(f"question{user_stage+1}", "force_reply")))
configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id))
configSet("stage", user_stage+1, file=str(msg.from_user.id))
else:
if not configGet("sent", file=str(msg.from_user.id)):
if not configGet("confirmed", file=str(msg.from_user.id)):
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}")
i += 1
await msg.reply_text(locale("confirm", "message").format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard"), resize_keyboard=True))
configSet(str(user_stage), str(msg.text), "application", file=str(msg.from_user.id))
#configSet("sent", True, file=str(msg.from_user.id))
#configSet("application_date", int(time()), file=str(msg.from_user.id))
else:
await msg.reply_text("You are already done, wait!")
else:
await msg.reply_text("You are already done, wait!")
# ==============================================================================================================================
if __name__ == "__main__":
logWrite(f"Starting up with pid {pid}")
# Yes, it should be in some kind of async main() function but I don't give a shit.
# I did compare performance, almost no difference and it's much more useful this way. Change my mind.
app.start() # type: ignore
app.send_message(configGet("owner"), f"Starting up with pid `{pid}`") # type: ignore
# # Registering user commands
# commands_list = []
# for command in configGet("commands"):
# commands_list.append(BotCommand(command, configGet("commands")[command]))
# app.set_bot_commands(commands_list) # type: ignore
# Registering admin commands
commands_admin_list = []
# for command in configGet("commands"):
# commands_admin_list.append(BotCommand(command, configGet("commands")[command]))
for command in configGet("commands_admin"):
commands_admin_list.append(BotCommand(command, configGet("commands_admin")[command]))
for admin in configGet("admins"):
try:
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) # type: ignore
except bad_request_400.PeerIdInvalid:
pass
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) # type: ignore
idle()
app.send_message(configGet("owner"), f"Shutting with pid `{pid}`") # type: ignore
app.stop() # type: ignore
killProc(pid)