Modularity overhaul

This commit is contained in:
Profitroll 2022-12-05 18:49:51 +01:00
parent 22a2224af1
commit 3abd1e8ad3
22 changed files with 1012 additions and 914 deletions

937
main.py
View File

@ -1,7 +1,4 @@
import asyncio
from datetime import datetime
from threading import Thread
from dateutil.relativedelta import relativedelta
from time import time
from os import getpid, path
from modules.birthdays import check_birthdays
@ -10,12 +7,8 @@ from modules.inline import *
from schedule import run_pending, every
from app import app
from pyrogram import filters
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.enums.chat_action import ChatAction
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, BotCommand, BotCommandScopeChat, ReplyKeyboardMarkup, ForceReply, ReplyKeyboardRemove, ChatPermissions
from modules.commands_register import commands_register
from pyrogram import idle # type: ignore
from pyrogram.errors.exceptions import bad_request_400
pid = getpid()
@ -29,878 +22,28 @@ for entry in [f"{configGet('data', 'locations')}{sep}applications.json", f"{conf
except:
pass
def should_quote(msg):
return True if msg.chat.type is not ChatType.PRIVATE else False
# Start command ================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["start"], prefixes=["/"]))
async def cmd_start(app, msg):
try:
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage != 0:
return
except FileNotFoundError:
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")
user_stage = configGet("stage", file=str(msg.from_user.id))
configSet(["telegram_id"], str(msg.from_user.username), file=str(msg.from_user.id))
configSet(["telegram_name"], f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id))
configSet(["telegram_phone"], str(msg.from_user.phone_number), file=str(msg.from_user.id))
configSet(["telegram_locale"], str(msg.from_user.language_code), file=str(msg.from_user.id))
logWrite(f"User {msg.from_user.id} started bot interaction")
await msg.reply_text(locale("start", "message"), reply_markup=ReplyKeyboardMarkup(locale("welcome", "keyboard"), resize_keyboard=True)) # type: ignore
# ==============================================================================================================================
# Shutdown command =============================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["kill", "die", "reboot"], prefixes=["/"]))
async def cmd_kill(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
logWrite(f"Shutting down bot with pid {pid}")
await msg.reply_text(f"Вимкнення бота з підом `{pid}`", quote=should_quote(msg))
killProc(pid)
# ==============================================================================================================================
# Rules command =============================================================================================================
default_rules_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(locale("rules_home", "button"), callback_data="rules_home"),
InlineKeyboardButton(locale("rules_additional", "button"), callback_data="rules_additional")
],
[
InlineKeyboardButton("1", callback_data="rule_1"),
InlineKeyboardButton("2", callback_data="rule_2"),
InlineKeyboardButton("3", callback_data="rule_3")
],
[
InlineKeyboardButton("4", callback_data="rule_4"),
InlineKeyboardButton("5", callback_data="rule_5"),
InlineKeyboardButton("6", callback_data="rule_6")
],
[
InlineKeyboardButton("7", callback_data="rule_7"),
InlineKeyboardButton("8", callback_data="rule_8"),
InlineKeyboardButton("9", callback_data="rule_9")
]
]
)
@app.on_message(~ filters.scheduled & filters.private & filters.command(["rules"], prefixes=["/"]))
async def cmd_rules(app, msg):
await msg.reply_text(locale("rules_msg"), disable_web_page_preview=True, reply_markup=default_rules_markup)
# for rule_msg in locale("rules"):
# await msg.reply_text(rule_msg)
# ==============================================================================================================================
# Warn command =================================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["warn"], prefixes=["/"]))
async def cmd_warn(app, msg):
if msg.chat.id == configGet("destination_group"):
if msg.reply_to_message_id != None:
if isAnAdmin(msg.from_user.id):
warnings = jsonLoad(f"{configGet('data', 'locations')}{sep}warnings.json")
if str(msg.reply_to_message.from_user.id) not in warnings:
warnings[str(msg.reply_to_message.from_user.id)] = 1
else:
warnings[str(msg.reply_to_message.from_user.id)] += 1
jsonSave(warnings, f"{configGet('data', 'locations')}{sep}warnings.json")
await msg.reply_text(locale("warned", "message").format(msg.reply_to_message.from_user.first_name, msg.reply_to_message.from_user.id)) # type: ignore
# ==============================================================================================================================
# Message command ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["message"], prefixes=["/"]))
async def cmd_message(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
try:
destination = int(msg.command[1])
void = msg.command[2]
message = " ".join(msg.command[2:])
try:
await app.send_message(destination, message)
await msg.reply_text(locale("message_sent", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} sent message '{' '.join(msg.command[2:])}' to {destination}")
except bad_request_400.PeerIdInvalid:
await msg.reply_text(locale("message_no_user", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} tried to send message '{' '.join(msg.command[2:])}' to {destination} but 'PeerIdInvalid'")
except IndexError:
await msg.reply_text(locale("message_invalid_syntax", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} tried to send message but 'IndexError'")
except ValueError:
await msg.reply_text(locale("message_invalid_syntax", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} tried to send message but 'ValueError'")
# ==============================================================================================================================
# Warnings command =============================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["warnings"], prefixes=["/"]))
async def cmd_warnings(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
warnings = jsonLoad(f"{configGet('data', 'locations')}{sep}warnings.json")
if len(msg.command) <= 1:
await msg.reply_text(locale("syntax_warnings", "message"), quote=should_quote(msg))
if path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json"):
target_id = str(int(msg.command[1]))
target_name = "N/A"
else:
list_of_users = []
async for m in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=msg.command[1]):
list_of_users.append(m)
if len(list_of_users) != 0:
target = list_of_users[0].user
target_name = target.first_name
target_id = str(target.id)
else:
await msg.reply_text(locale("no_user_warnings", "message").format(msg.command[1])) # type: ignore
return
if target_id not in warnings:
await msg.reply_text(locale("no_warnings", "message").format(target_name, target_id), quote=should_quote(msg)) # type: ignore
else:
if warnings[target_id] <= 5:
await msg.reply_text(locale("warnings_1", "message").format(target_name, target_id, warnings[target_id]), quote=should_quote(msg)) # type: ignore
else:
await msg.reply_text(locale("warnings_2", "message").format(target_name, target_id, warnings[target_id]), quote=should_quote(msg)) # type: ignore
# ==============================================================================================================================
# Applications command =========================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["applications"], prefixes=["/"]))
async def cmd_applications(app, msg):
if (await isAnAdmin(msg.from_user.id)) or (msg.chat.id == configGet("admin_group")):
await app.send_chat_action(msg.chat.id, ChatAction.UPLOAD_DOCUMENT)
await msg.reply_document(document=f"{configGet('data', 'locations')}{sep}applications.json", quote=should_quote(msg))
# ==============================================================================================================================
# Applications command =========================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["application"], prefixes=["/"]))
async def cmd_application(app, msg):
if (await isAnAdmin(msg.from_user.id)) or (msg.chat.id == configGet("admin_group")):
try:
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json")["approved"]):
user_id = int(msg.command[1])
else:
list_of_users = []
async for m in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=msg.command[1]):
list_of_users.append(m)
user_id = list_of_users[0].user.id
try:
user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{user_id}.json")
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(user_id)]
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(application['application']['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application']['2']} ({age.years} р.)")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application'][question]}")
i += 1
if user_data["sent"]:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_on_hold", "message")
else:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_not_send", "message")
logWrite(f"User {msg.from_user.id} requested application of {user_id}")
await msg.reply_text(locale("contact", "message").format(str(user_id), "\n".join(application_content), application_status), quote=should_quote(msg)) # type: ignore
except FileNotFoundError:
logWrite(f"User {msg.from_user.id} requested application of {user_id} but user does not exists")
await msg.reply_text(locale("contact_invalid", "message"), quote=should_quote(msg))
except IndexError:
await msg.reply_text(locale("application_invalid_syntax", "message"), quote=should_quote(msg))
# ==============================================================================================================================
# Reapply command ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["reapply"], prefixes=["/"]))
async def cmd_reapply(app, msg):
if configGet("approved", file=str(msg.from_user.id)) or configGet("refused", file=str(msg.from_user.id)):
if (configGet("stage", file=str(msg.from_user.id)) == 10) and not (configGet("sent", file=str(msg.from_user.id))):
left_chat = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == msg.from_user.id:
left_chat = False
if not left_chat:
configSet(["reapply"], True, file=str(msg.from_user.id))
configSet(["confirmed"], False, file=str(msg.from_user.id))
await welcome_pass(app, msg, once_again=True)
else:
await msg.reply_text(locale("reapply_left_chat", "message"), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("reapply_old_one", "button"), f"reapply_old_{msg.id}")
],
[
InlineKeyboardButton(locale("reapply_new_one", "button"), f"reapply_new_{msg.id}")
]
]))
else:
await msg.reply_text(locale("reapply_in_progress", "message").format(locale("confirm", "keyboard")[1][0]), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("applying_stop", "button"), f"reapply_stop_{msg.id}")
]
])) # type: ignore
else:
if configGet("sent", file=str(msg.from_user.id)):
await msg.reply_text(locale("reapply_forbidden", "message"))
else:
await msg.reply_text(locale("reapply_in_progress", "message").format(locale("confirm", "keyboard")[1][0]), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("applying_stop", "button"), f"reapply_stop_{msg.id}")
]
])) # type: ignore
# ==============================================================================================================================
# Welcome check ================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[0][0]) | filters.regex(locale("return", "keyboard")[0][0])))
async def welcome_pass(app, msg, once_again: bool = True) -> None:
"""Set user's stage to 1 and start a fresh application
### Args:
* app (app): Pyrogram Client to use
* msg (Message): Message with .from_user.id attribute equal to the end-user ID whose application will be started
* once_again (bool, optional): Set to False if it's the first time as user applies. Defaults to True.
"""
if not once_again:
await msg.reply_text(locale("privacy_notice", "message"))
logWrite(f"User {msg.from_user.id} confirmed starting the application")
await msg.reply_text(locale("question1", "message"), reply_markup=ForceReply(placeholder=locale("question1", "force_reply"))) # type: ignore
configSet(["stage"], 1, file=str(msg.from_user.id))
configSet(["sent"], False, file=str(msg.from_user.id))
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[1][0])))
async def welcome_reject(app, msg):
logWrite(f"User {msg.from_user.id} refused to start the application")
await msg.reply_text(locale("goodbye", "message"), reply_markup=ReplyKeyboardMarkup(locale("return", "keyboard"), resize_keyboard=True)) # type: ignore
# ==============================================================================================================================
# Confirmation =================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[0][0])))
async def confirm_yes(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 10:
if not configGet("sent", file=str(msg.from_user.id)):
await msg.reply_text(locale("application_sent", "message"), reply_markup=ReplyKeyboardRemove())
applications = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
applications[str(msg.from_user.id)] = {
"approved": False,
"approved_by": None,
"approval_date": None,
"refused": False,
"refused_by": False,
"refusal_date": False,
"application_date": int(time()),
"application": configGet("application", file=str(msg.from_user.id))
}
jsonSave(applications, f"{configGet('data', 'locations')}{sep}applications.json")
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(configGet('application', file=str(msg.from_user.id))['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))['2']} ({age.years} р.)")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}")
i += 1
if configGet("reapply", file=str(msg.from_user.id)):
await app.send_message(chat_id=configGet("admin_group"), text=(locale("reapply_got", "message")).format(str(msg.from_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), parse_mode=ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup( # type: ignore
[
[
InlineKeyboardButton(text=str(locale("reapply_yes", "button")), callback_data=f"reapply_yes_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("reapply_no", "button")), callback_data=f"reapply_no_{msg.from_user.id}")
]
]
)
)
else:
await app.send_message(chat_id=configGet("admin_group"), text=(locale("application_got", "message")).format(str(msg.from_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), parse_mode=ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup( # type: ignore
[
[
InlineKeyboardButton(text=str(locale("sub_yes", "button")), callback_data=f"sub_yes_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sub_no", "button")), callback_data=f"sub_no_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sub_no_aggressive", "button")), callback_data=f"sub_no_aggresive_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sub_no_russian", "button")), callback_data=f"sub_no_russian_{msg.from_user.id}")
]
]
)
)
logWrite(f"User {msg.from_user.id} sent his application and it will now be reviewed")
configSet(["sent"], True, file=str(msg.from_user.id))
configSet(["confirmed"], True, file=str(msg.from_user.id))
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[1][0])))
async def confirm_no(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 10:
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")
configSet(["telegram_id"], str(msg.from_user.username), file=str(msg.from_user.id))
configSet(["telegram_name"], f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id))
configSet(["telegram_phone"], str(msg.from_user.phone_number), file=str(msg.from_user.id))
configSet(["telegram_locale"], str(msg.from_user.language_code), file=str(msg.from_user.id))
await welcome_pass(app, msg, once_again=True)
logWrite(f"User {msg.from_user.id} restarted the application due to typo in it")
# ==============================================================================================================================
# Callbacks application ========================================================================================================
@app.on_callback_query(filters.regex("sub_yes_[\s\S]*")) # type: ignore
async def callback_query_accept(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("approved_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} got approved by {clb.from_user.id}")
need_link = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == int(fullclb[2]):
need_link = False
if need_link:
link = await app.create_chat_invite_link(configGet("destination_group"), name=f"Invite for {fullclb[2]}", member_limit=1) #, expire_date=datetime.now()+timedelta(days=1))
await app.send_message(int(fullclb[2]), locale("read_rules", "message"))
for rule_msg in locale("rules"):
await app.send_message(int(fullclb[2]), rule_msg)
await app.send_message(int(fullclb[2]), locale("approved", "message"), reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(str(locale("join", "button")), url=link.invite_link)
]]
))
configSet(["link"], link.invite_link, file=fullclb[2])
logWrite(f"User {fullclb[2]} got an invite link {link.invite_link}")
else:
await app.send_message(int(fullclb[2]), locale("approved_joined", "message"))
configSet(["approved"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["approved"] = True
application[fullclb[2]]["approved_by"] = clb.from_user.id
application[fullclb[2]]["approval_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("accepted", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_accepted", "callback").format(fullclb[2]), show_alert=True) # type: ignore
@app.on_callback_query(filters.regex("sub_no_aggressive_[\s\S]*")) # type: ignore
async def callback_query_refuse_aggressive(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by_agr", "message").format(clb.from_user.first_name, fullclb[3]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[3]), locale("refused", "message"))
logWrite(f"User {fullclb[3]} got refused by {clb.from_user.id} due to being aggressive")
configSet(["refused"], True, file=fullclb[3])
configSet(["sent"], False, file=fullclb[3])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[3]]["refused"] = True
application[fullclb[3]]["refused_by"] = clb.from_user.id
application[fullclb[3]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_no_aggressive", "callback").format(fullclb[3]), show_alert=True) # type: ignore
@app.on_callback_query(filters.regex("sub_no_russian_[\s\S]*")) # type: ignore
async def callback_query_refuse_russian(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by_rus", "message").format(clb.from_user.first_name, fullclb[3]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[3]), locale("refused", "message"))
await app.send_message(int(fullclb[3]), locale("refused_russian", "message"))
logWrite(f"User {fullclb[3]} got refused by {clb.from_user.id} due to being russian")
configSet(["refused"], True, file=fullclb[3])
configSet(["sent"], False, file=fullclb[3])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[3]]["refused"] = True
application[fullclb[3]]["refused_by"] = clb.from_user.id
application[fullclb[3]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_no_russian", "callback").format(fullclb[3]), show_alert=True) # type: ignore
@app.on_callback_query(filters.regex("sub_no_[\s\S]*")) # type: ignore
async def callback_query_refuse(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[2]), locale("refused", "message"))
logWrite(f"User {fullclb[2]} got refused by {clb.from_user.id}")
configSet(["refused"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["refused"] = True
application[fullclb[2]]["refused_by"] = clb.from_user.id
application[fullclb[2]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore
# ==============================================================================================================================
# Callbacks application ========================================================================================================
@app.on_callback_query(filters.regex("reapply_yes_[\s\S]*")) # type: ignore
async def callback_reapply_query_accept(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("approved_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} got their reapplication approved by {clb.from_user.id}")
await app.send_message(int(fullclb[2]), locale("approved_joined", "message"))
configSet(["approved"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["approved"] = True
application[fullclb[2]]["approved_by"] = clb.from_user.id
application[fullclb[2]]["approval_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("accepted", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_accepted", "callback").format(fullclb[2]), show_alert=True) # type: ignore
need_link = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == int(fullclb[2]):
need_link = False
if need_link:
link = await app.create_chat_invite_link(configGet("destination_group"), name=f"Invite for {fullclb[2]}", member_limit=1) #, expire_date=datetime.now()+timedelta(days=1))
await app.send_message(int(fullclb[2]), locale("read_rules", "message"))
for rule_msg in locale("rules"):
await app.send_message(int(fullclb[2]), rule_msg)
await app.send_message(int(fullclb[2]), locale("approved", "message"), reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(str(locale("join", "button")), url=link.invite_link)
]]
))
configSet(["link"], link.invite_link, file=fullclb[2])
logWrite(f"User {fullclb[2]} got an invite link {link.invite_link}")
else:
await app.send_message(int(fullclb[2]), locale("approved_joined", "message"))
@app.on_callback_query(filters.regex("reapply_no_[\s\S]*")) # type: ignore
async def callback_query_reapply_refuse(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[2]), locale("refused", "message"))
logWrite(f"User {fullclb[2]} got their reapplication refused by {clb.from_user.id}")
configSet(["refused"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["refused"] = True
application[fullclb[2]]["refused_by"] = clb.from_user.id
application[fullclb[2]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore
# Use old application when user reapplies after leaving the chat
@app.on_callback_query(filters.regex("reapply_old_[\s\S]*")) # type: ignore
async def callback_query_reapply_old(app, clb):
fullclb = clb.data.split("_")
message = await app.get_messages(clb.from_user.id, int(fullclb[2]))
configSet(["approved"], False, file=str(clb.from_user.id))
configSet(["refused"], False, file=str(clb.from_user.id))
await confirm_yes(app, message)
await clb.message.edit(clb.message.text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(locale("done", "button"), "nothing")]]))
# Start a new application when user reapplies after leaving the chat
@app.on_callback_query(filters.regex("reapply_new_[\s\S]*")) # type: ignore
async def callback_query_reapply_new(app, clb):
fullclb = clb.data.split("_")
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{clb.from_user.id}.json")
configSet(["telegram_id"], str(clb.from_user.username), file=str(clb.from_user.id))
configSet(["telegram_name"], f"{clb.from_user.first_name} {clb.from_user.last_name}", file=str(clb.from_user.id))
configSet(["telegram_phone"], str(clb.from_user.phone_number), file=str(clb.from_user.id))
configSet(["telegram_locale"], str(clb.from_user.language_code), file=str(clb.from_user.id))
await clb.answer(locale("reapply_stopped", "callback"))
message = await app.get_messages(clb.from_user.id, int(fullclb[2]))
await welcome_pass(app, message, once_again=True)
logWrite(f"User {clb.from_user.id} restarted the application after leaving the chat earlier")
await clb.message.edit(clb.message.text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(locale("done", "button"), "nothing")]]))
# Abort application fill in progress and restart it
@app.on_callback_query(filters.regex("reapply_stop_[\s\S]*")) # type: ignore
async def callback_query_reapply_stop(app, clb):
fullclb = clb.data.split("_")
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{clb.from_user.id}.json")
configSet(["telegram_id"], str(clb.from_user.username), file=str(clb.from_user.id))
configSet(["telegram_name"], f"{clb.from_user.first_name} {clb.from_user.last_name}", file=str(clb.from_user.id))
configSet(["telegram_phone"], str(clb.from_user.phone_number), file=str(clb.from_user.id))
configSet(["telegram_locale"], str(clb.from_user.language_code), file=str(clb.from_user.id))
await clb.answer(locale("reapply_stopped", "callback"))
message = await app.get_messages(clb.from_user.id, int(fullclb[2]))
await welcome_pass(app, message, once_again=True)
logWrite(f"User {clb.from_user.id} restarted the application due to typo in it")
await clb.message.edit(clb.message.text, reply_markup=None)
# ==============================================================================================================================
# Callbacks sus users ==========================================================================================================
@app.on_callback_query(filters.regex("sus_allow_[\s\S]*")) # type: ignore
async def callback_query_sus_allow(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("sus_allowed_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} was allowed to join with another link by {clb.from_user.id}")
edited_markup = [[InlineKeyboardButton(text=str(locale("sus_allowed", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sus_allowed", "callback").format(fullclb[2]), show_alert=True) # type: ignore
await app.restrict_chat_member(configGet("destination_group"), int(fullclb[2]), permissions=ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_other_messages=True,
can_send_polls=True
)
)
@app.on_callback_query(filters.regex("sus_refuse_[\s\S]*")) # type: ignore
async def callback_query_sus_refuse(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("sus_refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} was refused to join with another link by {clb.from_user.id}")
edited_markup = [[InlineKeyboardButton(text=str(locale("sus_refused", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sus_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore
await app.ban_chat_member(configGet("destination_group"), int(fullclb[2]))
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{fullclb[2]}.json")
configSet(["stage"], 10, file=fullclb[2])
configSet(["refused"], True, file=fullclb[2])
configSet(["refused_by"], clb.from_user.id, file=fullclb[2])
# ==============================================================================================================================
# Callback empty ===============================================================================================================
@app.on_callback_query(filters.regex("nothing")) # type: ignore
async def callback_query_nothing(app, clb):
await clb.answer(text=locale("nothing", "callback"))
# ==============================================================================================================================
# Callback rule ================================================================================================================
@app.on_callback_query(filters.regex("rule_[\s\S]*")) # type: ignore
async def callback_query_rule(app, clb):
fullclb = clb.data.split("_")
logWrite(f"User {clb.from_user.id} requested to check out rule {fullclb[1]}")
rule_num = int(fullclb[1])
if rule_num == len(locale("rules")):
lower_buttons = [
InlineKeyboardButton(locale("rules_prev", "button"), callback_data=f"rule_{rule_num-1}")
]
elif rule_num == 1:
lower_buttons = [
InlineKeyboardButton(locale("rules_next", "button"), callback_data=f"rule_{rule_num+1}")
]
else:
lower_buttons = [
InlineKeyboardButton(locale("rules_prev", "button"), callback_data=f"rule_{rule_num-1}"),
InlineKeyboardButton(locale("rules_next", "button"), callback_data=f"rule_{rule_num+1}")
]
try:
await clb.message.edit(text=locale("rules")[rule_num-1], disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(locale("rules_home", "button"), callback_data="rules_home"),
InlineKeyboardButton(locale("rules_additional", "button"), callback_data="rules_additional")
],
lower_buttons
]
)
)
except bad_request_400.MessageNotModified:
pass
await clb.answer(text=locale("rules_page", "callback").format(fullclb[1])) # type: ignore
@app.on_callback_query(filters.regex("rules_home")) # type: ignore
async def callback_query_rules_home(app, clb):
logWrite(f"User {clb.from_user.id} requested to check out homepage rules")
try:
await clb.message.edit(text=locale("rules_msg"), disable_web_page_preview=True, reply_markup=default_rules_markup)
except bad_request_400.MessageNotModified:
pass
await clb.answer(text=locale("rules_home", "callback")) # type: ignore
@app.on_callback_query(filters.regex("rules_additional")) # type: ignore
async def callback_query_rules_additional(app, clb):
logWrite(f"User {clb.from_user.id} requested to check out additional rules")
try:
await clb.message.edit(text=locale("rules_additional"), disable_web_page_preview=True, reply_markup=default_rules_markup)
except bad_request_400.MessageNotModified:
pass
await clb.answer(text=locale("rules_additional", "callback")) # type: ignore
# ==============================================================================================================================
# Contact getting ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.contact & filters.private)
async def get_contact(app, msg):
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")["approved"]) or (await isAnAdmin(msg.from_user.id)):
if msg.contact.user_id != None:
try:
user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.contact.user_id}.json")
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(msg.contact.user_id)]
application_content = []
i = 1
for question in application["application"]:
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(application['application']['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application']['2']} ({age.years} р.)")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application'][question]}")
i += 1
if user_data["sent"]:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_on_hold", "message")
else:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_not_send", "message")
logWrite(f"User {msg.from_user.id} requested application of {msg.contact.user_id}")
await msg.reply_text(locale("contact", "message").format(str(msg.contact.user_id), "\n".join(application_content), application_status)) # type: ignore
except FileNotFoundError:
logWrite(f"User {msg.from_user.id} requested application of {msg.contact.user_id} but user does not exists")
await msg.reply_text(locale("contact_invalid", "message"))
else:
logWrite(f"User {msg.from_user.id} requested application of someone but user is not telegram user")
await msg.reply_text(locale("contact_not_member", "message"))
# ==============================================================================================================================
# Any other input ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.private)
async def any_stage(app, msg):
if msg.via_bot is None:
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 1:
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply"))))
logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application")
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
configSet(["stage"], user_stage+1, file=str(msg.from_user.id))
elif user_stage == 2:
try:
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
input_dt = datetime.strptime(msg.text, "%d.%m.%Y")
if datetime.now() <= input_dt:
logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to joking")
await msg.reply_text(locale("question2_joke", "message"), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply"))))
elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days):
logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to being underage")
await msg.reply_text(locale("question2_underage", "message").format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply")))) # type: ignore
else:
logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application")
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply"))))
configSet(["stage"], user_stage+1, file=str(msg.from_user.id))
except ValueError:
logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to sending invalid date format")
await msg.reply_text(locale(f"question2_invalid", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage}", "force_reply"))))
else:
if user_stage <= 9:
logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application")
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply"))))
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
configSet(["stage"], user_stage+1, file=str(msg.from_user.id))
else:
if not configGet("sent", file=str(msg.from_user.id)):
if not configGet("confirmed", file=str(msg.from_user.id)):
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}")
i += 1
await msg.reply_text(locale("confirm", "message").format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard"), resize_keyboard=True)) # type: ignore
#configSet("sent", True, file=str(msg.from_user.id))
#configSet("application_date", int(time()), file=str(msg.from_user.id))
else:
if not configGet("approved", file=str(msg.from_user.id)) and not configGet("refused", file=str(msg.from_user.id)):
await msg.reply_text(locale("already_sent", "message"))
else:
if not configGet("approved", file=str(msg.from_user.id)) and not configGet("refused", file=str(msg.from_user.id)):
await msg.reply_text(locale("already_sent", "message"))
@app.on_message(~ filters.scheduled & filters.group)
async def message_in_group(app, msg):
if (msg.chat is not None) and (msg.via_bot is not None):
if (msg.via_bot.id == configGet("bot_id")) and (msg.chat.id == configGet("destination_group")):
if configGet("remove_application_time") > 0:
logWrite(f"User {msg.from_user.id} requested application in destination group, removing in {configGet('remove_application_time')} minutes")
await asyncio.sleep(configGet("remove_application_time")*60)
await msg.delete()
logWrite(f"Removed application requested by {msg.from_user.id} in destination group")
# ==============================================================================================================================
# Filter users on join =========================================================================================================
@app.on_chat_member_updated(group=configGet("destination_group"))
#@app.on_message(filters.new_chat_members, group=configGet("destination_group"))
async def filter_join(app, member):
if member.invite_link != None:
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json")["approved"]) or (await isAnAdmin(member.from_user.id)):
if configGet("link", file=str(member.from_user.id)) == member.invite_link.invite_link:
return
if await isAnAdmin(member.invite_link.creator.id):
return
await app.send_message(configGet("admin_group"), f"User **{member.from_user.first_name}** (`{member.from_user.id}`) joined the chat not with his personal link", reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(text=str(locale("sus_allow", "button")), callback_data=f"sus_allow_{member.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sus_refuse", "button")), callback_data=f"sus_refuse_{member.from_user.id}")
]
]
))
await app.restrict_chat_member(member.chat.id, member.from_user.id, permissions=ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_other_messages=False,
can_send_polls=False
)
)
# ==============================================================================================================================
# Importing
from modules.commands.application import *
from modules.commands.applications import *
from modules.commands.message import *
from modules.commands.reapply import *
from modules.commands.reboot import *
from modules.commands.rules import *
from modules.commands.start import *
from modules.commands.warn import *
from modules.commands.warnings import *
from modules.callbacks.nothing import *
from modules.callbacks.reapply import *
from modules.callbacks.rules import *
from modules.callbacks.sub import *
from modules.callbacks.sus import *
from modules.handlers.confirmation import *
from modules.handlers.contact import *
from modules.handlers.group_join import *
from modules.handlers.welcome import *
from modules.handlers.everything import *
if __name__ == "__main__":
@ -932,39 +75,7 @@ if __name__ == "__main__":
app.send_message(configGet("owner"), f"Starting up with pid `{pid}`") # type: ignore
# Registering user commands
commands_list = []
for command in configGet("commands"):
commands_list.append(BotCommand(command, configGet("commands")[command]))
app.set_bot_commands(commands_list) # type: ignore
# Registering admin commands
commands_admin_list = []
for command in configGet("commands"):
commands_admin_list.append(BotCommand(command, configGet("commands")[command]))
for command in configGet("commands_admin"):
commands_admin_list.append(BotCommand(command, configGet("commands_admin")[command]))
for admin in configGet("admins"):
try:
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) # type: ignore
except bad_request_400.PeerIdInvalid:
pass
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) # type: ignore
# Registering admin group commands
commands_group_admin_list = []
for command in configGet("commands_group_admin"):
commands_group_admin_list.append(BotCommand(command, configGet("commands_group_admin")[command]))
app.set_bot_commands(commands_group_admin_list, scope=BotCommandScopeChat(chat_id=configGet("admin_group"))) # type: ignore
# Registering destination group commands
commands_group_destination_list = []
for command in configGet("commands_group_destination"):
commands_group_destination_list.append(BotCommand(command, configGet("commands_group_destination")[command]))
app.set_bot_commands(commands_group_destination_list, scope=BotCommandScopeChat(chat_id=configGet("destination_group"))) # type: ignore
commands_register(app)
idle()

View File

@ -0,0 +1,9 @@
from app import app
from pyrogram import filters
from modules.utils import locale
# Callback empty ===============================================================================================================
@app.on_callback_query(filters.regex("nothing")) # type: ignore
async def callback_query_nothing(app, clb):
await clb.answer(text=locale("nothing", "callback"))
# ==============================================================================================================================

View File

@ -0,0 +1,126 @@
from os import sep
from time import time
from app import app
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from modules.utils import configGet, configSet, jsonLoad, jsonSave, locale, logWrite
from modules.handlers.confirmation import confirm_yes
from modules.handlers.welcome import welcome_pass
# Callbacks reapply ============================================================================================================
@app.on_callback_query(filters.regex("reapply_yes_[\s\S]*")) # type: ignore
async def callback_reapply_query_accept(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("approved_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} got their reapplication approved by {clb.from_user.id}")
await app.send_message(int(fullclb[2]), locale("approved_joined", "message"))
configSet(["approved"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["approved"] = True
application[fullclb[2]]["approved_by"] = clb.from_user.id
application[fullclb[2]]["approval_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("accepted", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_accepted", "callback").format(fullclb[2]), show_alert=True) # type: ignore
need_link = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == int(fullclb[2]):
need_link = False
if need_link:
link = await app.create_chat_invite_link(configGet("destination_group"), name=f"Invite for {fullclb[2]}", member_limit=1) #, expire_date=datetime.now()+timedelta(days=1))
await app.send_message(int(fullclb[2]), locale("read_rules", "message"))
for rule_msg in locale("rules"):
await app.send_message(int(fullclb[2]), rule_msg)
await app.send_message(int(fullclb[2]), locale("approved", "message"), reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(str(locale("join", "button")), url=link.invite_link)
]]
))
configSet(["link"], link.invite_link, file=fullclb[2])
logWrite(f"User {fullclb[2]} got an invite link {link.invite_link}")
else:
await app.send_message(int(fullclb[2]), locale("approved_joined", "message"))
@app.on_callback_query(filters.regex("reapply_no_[\s\S]*")) # type: ignore
async def callback_query_reapply_refuse(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[2]), locale("refused", "message"))
logWrite(f"User {fullclb[2]} got their reapplication refused by {clb.from_user.id}")
configSet(["refused"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["refused"] = True
application[fullclb[2]]["refused_by"] = clb.from_user.id
application[fullclb[2]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore
# Use old application when user reapplies after leaving the chat
@app.on_callback_query(filters.regex("reapply_old_[\s\S]*")) # type: ignore
async def callback_query_reapply_old(app, clb):
fullclb = clb.data.split("_")
message = await app.get_messages(clb.from_user.id, int(fullclb[2]))
configSet(["approved"], False, file=str(clb.from_user.id))
configSet(["refused"], False, file=str(clb.from_user.id))
await confirm_yes(app, message)
await clb.message.edit(clb.message.text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(locale("done", "button"), "nothing")]]))
# Start a new application when user reapplies after leaving the chat
@app.on_callback_query(filters.regex("reapply_new_[\s\S]*")) # type: ignore
async def callback_query_reapply_new(app, clb):
fullclb = clb.data.split("_")
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{clb.from_user.id}.json")
configSet(["telegram_id"], str(clb.from_user.username), file=str(clb.from_user.id))
configSet(["telegram_name"], f"{clb.from_user.first_name} {clb.from_user.last_name}", file=str(clb.from_user.id))
configSet(["telegram_phone"], str(clb.from_user.phone_number), file=str(clb.from_user.id))
configSet(["telegram_locale"], str(clb.from_user.language_code), file=str(clb.from_user.id))
await clb.answer(locale("reapply_stopped", "callback"))
message = await app.get_messages(clb.from_user.id, int(fullclb[2]))
await welcome_pass(app, message, once_again=True)
logWrite(f"User {clb.from_user.id} restarted the application after leaving the chat earlier")
await clb.message.edit(clb.message.text, reply_markup=InlineKeyboardMarkup([[InlineKeyboardButton(locale("done", "button"), "nothing")]]))
# Abort application fill in progress and restart it
@app.on_callback_query(filters.regex("reapply_stop_[\s\S]*")) # type: ignore
async def callback_query_reapply_stop(app, clb):
fullclb = clb.data.split("_")
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{clb.from_user.id}.json")
configSet(["telegram_id"], str(clb.from_user.username), file=str(clb.from_user.id))
configSet(["telegram_name"], f"{clb.from_user.first_name} {clb.from_user.last_name}", file=str(clb.from_user.id))
configSet(["telegram_phone"], str(clb.from_user.phone_number), file=str(clb.from_user.id))
configSet(["telegram_locale"], str(clb.from_user.language_code), file=str(clb.from_user.id))
await clb.answer(locale("reapply_stopped", "callback"))
message = await app.get_messages(clb.from_user.id, int(fullclb[2]))
await welcome_pass(app, message, once_again=True)
logWrite(f"User {clb.from_user.id} restarted the application due to typo in it")
await clb.message.edit(clb.message.text, reply_markup=None)
# ==============================================================================================================================

View File

@ -0,0 +1,70 @@
from app import app
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from pyrogram.errors import bad_request_400
from modules.utils import locale, logWrite
from modules.commands.rules import default_rules_markup
# Callback rule ================================================================================================================
@app.on_callback_query(filters.regex("rule_[\s\S]*")) # type: ignore
async def callback_query_rule(app, clb):
fullclb = clb.data.split("_")
logWrite(f"User {clb.from_user.id} requested to check out rule {fullclb[1]}")
rule_num = int(fullclb[1])
if rule_num == len(locale("rules")):
lower_buttons = [
InlineKeyboardButton(locale("rules_prev", "button"), callback_data=f"rule_{rule_num-1}")
]
elif rule_num == 1:
lower_buttons = [
InlineKeyboardButton(locale("rules_next", "button"), callback_data=f"rule_{rule_num+1}")
]
else:
lower_buttons = [
InlineKeyboardButton(locale("rules_prev", "button"), callback_data=f"rule_{rule_num-1}"),
InlineKeyboardButton(locale("rules_next", "button"), callback_data=f"rule_{rule_num+1}")
]
try:
await clb.message.edit(text=locale("rules")[rule_num-1], disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(locale("rules_home", "button"), callback_data="rules_home"),
InlineKeyboardButton(locale("rules_additional", "button"), callback_data="rules_additional")
],
lower_buttons
]
)
)
except bad_request_400.MessageNotModified:
pass
await clb.answer(text=locale("rules_page", "callback").format(fullclb[1])) # type: ignore
@app.on_callback_query(filters.regex("rules_home")) # type: ignore
async def callback_query_rules_home(app, clb):
logWrite(f"User {clb.from_user.id} requested to check out homepage rules")
try:
await clb.message.edit(text=locale("rules_msg"), disable_web_page_preview=True, reply_markup=default_rules_markup)
except bad_request_400.MessageNotModified:
pass
await clb.answer(text=locale("rules_home", "callback")) # type: ignore
@app.on_callback_query(filters.regex("rules_additional")) # type: ignore
async def callback_query_rules_additional(app, clb):
logWrite(f"User {clb.from_user.id} requested to check out additional rules")
try:
await clb.message.edit(text=locale("rules_additional"), disable_web_page_preview=True, reply_markup=default_rules_markup)
except bad_request_400.MessageNotModified:
pass
await clb.answer(text=locale("rules_additional", "callback")) # type: ignore
# ==============================================================================================================================

125
modules/callbacks/sub.py Normal file
View File

@ -0,0 +1,125 @@
from os import sep
from time import time
from app import app
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from modules.utils import configGet, configSet, jsonLoad, jsonSave, locale, logWrite
# Callbacks application ========================================================================================================
@app.on_callback_query(filters.regex("sub_yes_[\s\S]*")) # type: ignore
async def callback_query_accept(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("approved_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} got approved by {clb.from_user.id}")
need_link = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == int(fullclb[2]):
need_link = False
if need_link:
link = await app.create_chat_invite_link(configGet("destination_group"), name=f"Invite for {fullclb[2]}", member_limit=1) #, expire_date=datetime.now()+timedelta(days=1))
await app.send_message(int(fullclb[2]), locale("read_rules", "message"))
for rule_msg in locale("rules"):
await app.send_message(int(fullclb[2]), rule_msg)
await app.send_message(int(fullclb[2]), locale("approved", "message"), reply_markup=InlineKeyboardMarkup(
[[
InlineKeyboardButton(str(locale("join", "button")), url=link.invite_link)
]]
))
configSet(["link"], link.invite_link, file=fullclb[2])
logWrite(f"User {fullclb[2]} got an invite link {link.invite_link}")
else:
await app.send_message(int(fullclb[2]), locale("approved_joined", "message"))
configSet(["approved"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["approved"] = True
application[fullclb[2]]["approved_by"] = clb.from_user.id
application[fullclb[2]]["approval_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("accepted", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_accepted", "callback").format(fullclb[2]), show_alert=True) # type: ignore
@app.on_callback_query(filters.regex("sub_no_aggressive_[\s\S]*")) # type: ignore
async def callback_query_refuse_aggressive(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by_agr", "message").format(clb.from_user.first_name, fullclb[3]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[3]), locale("refused", "message"))
logWrite(f"User {fullclb[3]} got refused by {clb.from_user.id} due to being aggressive")
configSet(["refused"], True, file=fullclb[3])
configSet(["sent"], False, file=fullclb[3])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[3]]["refused"] = True
application[fullclb[3]]["refused_by"] = clb.from_user.id
application[fullclb[3]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_no_aggressive", "callback").format(fullclb[3]), show_alert=True) # type: ignore
@app.on_callback_query(filters.regex("sub_no_russian_[\s\S]*")) # type: ignore
async def callback_query_refuse_russian(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by_rus", "message").format(clb.from_user.first_name, fullclb[3]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[3]), locale("refused", "message"))
await app.send_message(int(fullclb[3]), locale("refused_russian", "message"))
logWrite(f"User {fullclb[3]} got refused by {clb.from_user.id} due to being russian")
configSet(["refused"], True, file=fullclb[3])
configSet(["sent"], False, file=fullclb[3])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[3]]["refused"] = True
application[fullclb[3]]["refused_by"] = clb.from_user.id
application[fullclb[3]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_no_russian", "callback").format(fullclb[3]), show_alert=True) # type: ignore
@app.on_callback_query(filters.regex("sub_no_[\s\S]*")) # type: ignore
async def callback_query_refuse(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
await app.send_message(int(fullclb[2]), locale("refused", "message"))
logWrite(f"User {fullclb[2]} got refused by {clb.from_user.id}")
configSet(["refused"], True, file=fullclb[2])
configSet(["sent"], False, file=fullclb[2])
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
application[fullclb[2]]["refused"] = True
application[fullclb[2]]["refused_by"] = clb.from_user.id
application[fullclb[2]]["refusal_date"] = int(time())
jsonSave(application, f"{configGet('data', 'locations')}{sep}applications.json")
edited_markup = [[InlineKeyboardButton(text=str(locale("declined", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sub_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore
# ==============================================================================================================================

47
modules/callbacks/sus.py Normal file
View File

@ -0,0 +1,47 @@
from os import sep
from app import app
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ChatPermissions
from modules.utils import configGet, configSet, jsonLoad, jsonSave, locale, logWrite
# Callbacks sus users ==========================================================================================================
@app.on_callback_query(filters.regex("sus_allow_[\s\S]*")) # type: ignore
async def callback_query_sus_allow(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("sus_allowed_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} was allowed to join with another link by {clb.from_user.id}")
edited_markup = [[InlineKeyboardButton(text=str(locale("sus_allowed", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sus_allowed", "callback").format(fullclb[2]), show_alert=True) # type: ignore
await app.restrict_chat_member(configGet("destination_group"), int(fullclb[2]), permissions=ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_other_messages=True,
can_send_polls=True
)
)
@app.on_callback_query(filters.regex("sus_refuse_[\s\S]*")) # type: ignore
async def callback_query_sus_refuse(app, clb):
fullclb = clb.data.split("_")
await app.send_message(configGet("admin_group"), locale("sus_refused_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True) # type: ignore
logWrite(f"User {fullclb[2]} was refused to join with another link by {clb.from_user.id}")
edited_markup = [[InlineKeyboardButton(text=str(locale("sus_refused", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sus_refused", "callback").format(fullclb[2]), show_alert=True) # type: ignore
await app.ban_chat_member(configGet("destination_group"), int(fullclb[2]))
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{fullclb[2]}.json")
configSet(["stage"], 10, file=fullclb[2])
configSet(["refused"], True, file=fullclb[2])
configSet(["refused_by"], clb.from_user.id, file=fullclb[2])
# ==============================================================================================================================

View File

@ -0,0 +1,56 @@
from os import sep, path
from datetime import datetime
from app import app, isAnAdmin
from pyrogram import filters
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from modules.utils import configGet, jsonLoad, logWrite, locale, should_quote
from dateutil.relativedelta import relativedelta
# Applications command =========================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["application"], prefixes=["/"]))
async def cmd_application(app, msg):
if (await isAnAdmin(msg.from_user.id)) or (msg.chat.id == configGet("admin_group")):
try:
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json")["approved"]):
user_id = int(msg.command[1])
else:
list_of_users = []
async for m in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=msg.command[1]):
list_of_users.append(m)
user_id = list_of_users[0].user.id
try:
user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{user_id}.json")
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(user_id)]
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(application['application']['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application']['2']} ({age.years} р.)")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application'][question]}")
i += 1
if user_data["sent"]:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_on_hold", "message")
else:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_not_send", "message")
logWrite(f"User {msg.from_user.id} requested application of {user_id}")
await msg.reply_text(locale("contact", "message").format(str(user_id), "\n".join(application_content), application_status), quote=should_quote(msg)) # type: ignore
except FileNotFoundError:
logWrite(f"User {msg.from_user.id} requested application of {user_id} but user does not exists")
await msg.reply_text(locale("contact_invalid", "message"), quote=should_quote(msg))
except IndexError:
await msg.reply_text(locale("application_invalid_syntax", "message"), quote=should_quote(msg))
# ==============================================================================================================================

View File

@ -0,0 +1,14 @@
from os import sep
from app import app, isAnAdmin
from pyrogram import filters
from pyrogram.enums.chat_action import ChatAction
from modules.utils import configGet, should_quote
# Applications command =========================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["applications"], prefixes=["/"]))
async def cmd_applications(app, msg):
if (await isAnAdmin(msg.from_user.id)) or (msg.chat.id == configGet("admin_group")):
await app.send_chat_action(msg.chat.id, ChatAction.UPLOAD_DOCUMENT)
await msg.reply_document(document=f"{configGet('data', 'locations')}{sep}applications.json", quote=should_quote(msg))
# ==============================================================================================================================

View File

@ -0,0 +1,29 @@
from app import app, isAnAdmin
from pyrogram import filters
from pyrogram.errors import bad_request_400
from modules.utils import logWrite, locale, configGet, should_quote
# Message command ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["message"], prefixes=["/"]))
async def cmd_message(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
try:
destination = int(msg.command[1])
void = msg.command[2]
message = " ".join(msg.command[2:])
try:
await app.send_message(destination, message)
await msg.reply_text(locale("message_sent", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} sent message '{' '.join(msg.command[2:])}' to {destination}")
except bad_request_400.PeerIdInvalid:
await msg.reply_text(locale("message_no_user", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} tried to send message '{' '.join(msg.command[2:])}' to {destination} but 'PeerIdInvalid'")
except IndexError:
await msg.reply_text(locale("message_invalid_syntax", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} tried to send message but 'IndexError'")
except ValueError:
await msg.reply_text(locale("message_invalid_syntax", "message"), quote=should_quote(msg))
logWrite(f"Admin {msg.from_user.id} tried to send message but 'ValueError'")
# ==============================================================================================================================

View File

@ -0,0 +1,45 @@
from app import app
from pyrogram import filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from modules.utils import configGet, configSet, locale
from modules.handlers.welcome import welcome_pass
# Reapply command ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["reapply"], prefixes=["/"]))
async def cmd_reapply(app, msg):
if configGet("approved", file=str(msg.from_user.id)) or configGet("refused", file=str(msg.from_user.id)):
if (configGet("stage", file=str(msg.from_user.id)) == 10) and not (configGet("sent", file=str(msg.from_user.id))):
left_chat = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == msg.from_user.id:
left_chat = False
if not left_chat:
configSet(["reapply"], True, file=str(msg.from_user.id))
configSet(["confirmed"], False, file=str(msg.from_user.id))
await welcome_pass(app, msg, once_again=True)
else:
await msg.reply_text(locale("reapply_left_chat", "message"), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("reapply_old_one", "button"), f"reapply_old_{msg.id}")
],
[
InlineKeyboardButton(locale("reapply_new_one", "button"), f"reapply_new_{msg.id}")
]
]))
else:
await msg.reply_text(locale("reapply_in_progress", "message").format(locale("confirm", "keyboard")[1][0]), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("applying_stop", "button"), f"reapply_stop_{msg.id}")
]
])) # type: ignore
else:
if configGet("sent", file=str(msg.from_user.id)):
await msg.reply_text(locale("reapply_forbidden", "message"))
else:
await msg.reply_text(locale("reapply_in_progress", "message").format(locale("confirm", "keyboard")[1][0]), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("applying_stop", "button"), f"reapply_stop_{msg.id}")
]
])) # type: ignore
# ==============================================================================================================================

View File

@ -0,0 +1,16 @@
from app import app, isAnAdmin
from os import getpid
from pyrogram import filters
from modules.utils import configGet, logWrite, killProc, should_quote
pid = getpid()
# Shutdown command =============================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["kill", "die", "reboot"], prefixes=["/"]))
async def cmd_kill(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
logWrite(f"Shutting down bot with pid {pid}")
await msg.reply_text(f"Вимкнення бота з підом `{pid}`", quote=should_quote(msg))
killProc(pid)
# ==============================================================================================================================

34
modules/commands/rules.py Normal file
View File

@ -0,0 +1,34 @@
from app import app
from pyrogram import filters
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton
from modules.utils import locale
# Rules command =============================================================================================================
default_rules_markup = InlineKeyboardMarkup(
[
[
InlineKeyboardButton(locale("rules_home", "button"), callback_data="rules_home"),
InlineKeyboardButton(locale("rules_additional", "button"), callback_data="rules_additional")
],
[
InlineKeyboardButton("1", callback_data="rule_1"),
InlineKeyboardButton("2", callback_data="rule_2"),
InlineKeyboardButton("3", callback_data="rule_3")
],
[
InlineKeyboardButton("4", callback_data="rule_4"),
InlineKeyboardButton("5", callback_data="rule_5"),
InlineKeyboardButton("6", callback_data="rule_6")
],
[
InlineKeyboardButton("7", callback_data="rule_7"),
InlineKeyboardButton("8", callback_data="rule_8"),
InlineKeyboardButton("9", callback_data="rule_9")
]
]
)
@app.on_message(~ filters.scheduled & filters.private & filters.command(["rules"], prefixes=["/"]))
async def cmd_rules(app, msg):
await msg.reply_text(locale("rules_msg"), disable_web_page_preview=True, reply_markup=default_rules_markup)
# ==============================================================================================================================

24
modules/commands/start.py Normal file
View File

@ -0,0 +1,24 @@
from app import app
from os import sep
from pyrogram import filters
from modules.utils import jsonLoad, jsonSave, configGet, configSet, logWrite
# Start command ================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & filters.command(["start"], prefixes=["/"]))
async def cmd_start(app, msg):
try:
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage != 0:
return
except FileNotFoundError:
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")
user_stage = configGet("stage", file=str(msg.from_user.id))
configSet(["telegram_id"], str(msg.from_user.username), file=str(msg.from_user.id))
configSet(["telegram_name"], f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id))
configSet(["telegram_phone"], str(msg.from_user.phone_number), file=str(msg.from_user.id))
configSet(["telegram_locale"], str(msg.from_user.language_code), file=str(msg.from_user.id))
logWrite(f"User {msg.from_user.id} started bot interaction")
await msg.reply_text(locale("start", "message"), reply_markup=ReplyKeyboardMarkup(locale("welcome", "keyboard"), resize_keyboard=True)) # type: ignore
# ==============================================================================================================================

20
modules/commands/warn.py Normal file
View File

@ -0,0 +1,20 @@
from os import sep
from app import app, isAnAdmin
from pyrogram import filters
from modules.utils import jsonLoad, jsonSave, configGet
# Warn command =================================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["warn"], prefixes=["/"]))
async def cmd_warn(app, msg):
if msg.chat.id == configGet("destination_group"):
if msg.reply_to_message_id != None:
if isAnAdmin(msg.from_user.id):
warnings = jsonLoad(f"{configGet('data', 'locations')}{sep}warnings.json")
if str(msg.reply_to_message.from_user.id) not in warnings:
warnings[str(msg.reply_to_message.from_user.id)] = 1
else:
warnings[str(msg.reply_to_message.from_user.id)] += 1
jsonSave(warnings, f"{configGet('data', 'locations')}{sep}warnings.json")
await msg.reply_text(locale("warned", "message").format(msg.reply_to_message.from_user.first_name, msg.reply_to_message.from_user.id)) # type: ignore
# ==============================================================================================================================

View File

@ -0,0 +1,41 @@
from os import path, sep
from app import app, isAnAdmin
from pyrogram import filters
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from modules.utils import configGet, jsonLoad, locale, should_quote
# Warnings command =============================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["warnings"], prefixes=["/"]))
async def cmd_warnings(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
warnings = jsonLoad(f"{configGet('data', 'locations')}{sep}warnings.json")
if len(msg.command) <= 1:
await msg.reply_text(locale("syntax_warnings", "message"), quote=should_quote(msg))
if path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json"):
target_id = str(int(msg.command[1]))
target_name = "N/A"
else:
list_of_users = []
async for m in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=msg.command[1]):
list_of_users.append(m)
if len(list_of_users) != 0:
target = list_of_users[0].user
target_name = target.first_name
target_id = str(target.id)
else:
await msg.reply_text(locale("no_user_warnings", "message").format(msg.command[1])) # type: ignore
return
if target_id not in warnings:
await msg.reply_text(locale("no_warnings", "message").format(target_name, target_id), quote=should_quote(msg)) # type: ignore
else:
if warnings[target_id] <= 5:
await msg.reply_text(locale("warnings_1", "message").format(target_name, target_id, warnings[target_id]), quote=should_quote(msg)) # type: ignore
else:
await msg.reply_text(locale("warnings_2", "message").format(target_name, target_id, warnings[target_id]), quote=should_quote(msg)) # type: ignore
# ==============================================================================================================================

View File

@ -0,0 +1,39 @@
from modules.utils import configGet
from pyrogram.types import BotCommand, BotCommandScopeChat
from pyrogram.errors import bad_request_400
def commands_register(app):
# Registering user commands
commands_list = []
for command in configGet("commands"):
commands_list.append(BotCommand(command, configGet("commands")[command]))
app.set_bot_commands(commands_list) # type: ignore
# Registering admin commands
commands_admin_list = []
for command in configGet("commands"):
commands_admin_list.append(BotCommand(command, configGet("commands")[command]))
for command in configGet("commands_admin"):
commands_admin_list.append(BotCommand(command, configGet("commands_admin")[command]))
for admin in configGet("admins"):
try:
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) # type: ignore
except bad_request_400.PeerIdInvalid:
pass
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) # type: ignore
# Registering admin group commands
commands_group_admin_list = []
for command in configGet("commands_group_admin"):
commands_group_admin_list.append(BotCommand(command, configGet("commands_group_admin")[command]))
app.set_bot_commands(commands_group_admin_list, scope=BotCommandScopeChat(chat_id=configGet("admin_group"))) # type: ignore
# Registering destination group commands
commands_group_destination_list = []
for command in configGet("commands_group_destination"):
commands_group_destination_list.append(BotCommand(command, configGet("commands_group_destination")[command]))
app.set_bot_commands(commands_group_destination_list, scope=BotCommandScopeChat(chat_id=configGet("destination_group"))) # type: ignore

View File

@ -0,0 +1,98 @@
from os import sep
from time import time
from dateutil.relativedelta import relativedelta
from datetime import datetime
from app import app
from pyrogram import filters
from pyrogram.types import ReplyKeyboardRemove, InlineKeyboardMarkup, InlineKeyboardButton
from modules.utils import configGet, configSet, jsonLoad, jsonSave, locale, logWrite
from modules.handlers.welcome import welcome_pass
# Confirmation =================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[0][0])))
async def confirm_yes(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 10:
if not configGet("sent", file=str(msg.from_user.id)):
await msg.reply_text(locale("application_sent", "message"), reply_markup=ReplyKeyboardRemove())
applications = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")
applications[str(msg.from_user.id)] = {
"approved": False,
"approved_by": None,
"approval_date": None,
"refused": False,
"refused_by": False,
"refusal_date": False,
"application_date": int(time()),
"application": configGet("application", file=str(msg.from_user.id))
}
jsonSave(applications, f"{configGet('data', 'locations')}{sep}applications.json")
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(configGet('application', file=str(msg.from_user.id))['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))['2']} ({age.years} р.)")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}")
i += 1
if configGet("reapply", file=str(msg.from_user.id)):
await app.send_message(chat_id=configGet("admin_group"), text=(locale("reapply_got", "message")).format(str(msg.from_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), parse_mode=ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup( # type: ignore
[
[
InlineKeyboardButton(text=str(locale("reapply_yes", "button")), callback_data=f"reapply_yes_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("reapply_no", "button")), callback_data=f"reapply_no_{msg.from_user.id}")
]
]
)
)
else:
await app.send_message(chat_id=configGet("admin_group"), text=(locale("application_got", "message")).format(str(msg.from_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), parse_mode=ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup( # type: ignore
[
[
InlineKeyboardButton(text=str(locale("sub_yes", "button")), callback_data=f"sub_yes_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sub_no", "button")), callback_data=f"sub_no_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sub_no_aggressive", "button")), callback_data=f"sub_no_aggresive_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sub_no_russian", "button")), callback_data=f"sub_no_russian_{msg.from_user.id}")
]
]
)
)
logWrite(f"User {msg.from_user.id} sent his application and it will now be reviewed")
configSet(["sent"], True, file=str(msg.from_user.id))
configSet(["confirmed"], True, file=str(msg.from_user.id))
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("confirm", "keyboard")[1][0])))
async def confirm_no(app, msg):
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 10:
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")
configSet(["telegram_id"], str(msg.from_user.username), file=str(msg.from_user.id))
configSet(["telegram_name"], f"{msg.from_user.first_name} {msg.from_user.last_name}", file=str(msg.from_user.id))
configSet(["telegram_phone"], str(msg.from_user.phone_number), file=str(msg.from_user.id))
configSet(["telegram_locale"], str(msg.from_user.language_code), file=str(msg.from_user.id))
await welcome_pass(app, msg, once_again=True)
logWrite(f"User {msg.from_user.id} restarted the application due to typo in it")
# ==============================================================================================================================

View File

@ -0,0 +1,47 @@
from os import sep, path
from dateutil.relativedelta import relativedelta
from datetime import datetime
from app import app, isAnAdmin
from pyrogram import filters
from modules.utils import configGet, jsonLoad, locale, logWrite
# Contact getting ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.contact & filters.private)
async def get_contact(app, msg):
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")["approved"]) or (await isAnAdmin(msg.from_user.id)):
if msg.contact.user_id != None:
try:
user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.contact.user_id}.json")
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(msg.contact.user_id)]
application_content = []
i = 1
for question in application["application"]:
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(application['application']['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application']['2']} ({age.years} р.)")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application'][question]}")
i += 1
if user_data["sent"]:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_on_hold", "message")
else:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
elif application["refused"]:
application_status = locale("application_status_refused", "message").format((await app.get_users(application["refused_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M")) # type: ignore
else:
application_status = locale("application_status_not_send", "message")
logWrite(f"User {msg.from_user.id} requested application of {msg.contact.user_id}")
await msg.reply_text(locale("contact", "message").format(str(msg.contact.user_id), "\n".join(application_content), application_status)) # type: ignore
except FileNotFoundError:
logWrite(f"User {msg.from_user.id} requested application of {msg.contact.user_id} but user does not exists")
await msg.reply_text(locale("contact_invalid", "message"))
else:
logWrite(f"User {msg.from_user.id} requested application of someone but user is not telegram user")
await msg.reply_text(locale("contact_not_member", "message"))
# ==============================================================================================================================

View File

@ -0,0 +1,81 @@
from datetime import datetime
from app import app
import asyncio
from pyrogram import filters
from pyrogram.types import ForceReply
from modules.utils import configGet, configSet, locale, logWrite
# Any other input ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.private)
async def any_stage(app, msg):
if msg.via_bot is None:
user_stage = configGet("stage", file=str(msg.from_user.id))
if user_stage == 1:
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply"))))
logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application")
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
configSet(["stage"], user_stage+1, file=str(msg.from_user.id))
elif user_stage == 2:
try:
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
input_dt = datetime.strptime(msg.text, "%d.%m.%Y")
if datetime.now() <= input_dt:
logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to joking")
await msg.reply_text(locale("question2_joke", "message"), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply"))))
elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days):
logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to being underage")
await msg.reply_text(locale("question2_underage", "message").format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply")))) # type: ignore
else:
logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application")
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply"))))
configSet(["stage"], user_stage+1, file=str(msg.from_user.id))
except ValueError:
logWrite(f"User {msg.from_user.id} failed stage {user_stage} due to sending invalid date format")
await msg.reply_text(locale(f"question2_invalid", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage}", "force_reply"))))
else:
if user_stage <= 9:
logWrite(f"User {msg.from_user.id} completed stage {user_stage} of application")
await msg.reply_text(locale(f"question{user_stage+1}", "message"), reply_markup=ForceReply(placeholder=str(locale(f"question{user_stage+1}", "force_reply"))))
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
configSet(["stage"], user_stage+1, file=str(msg.from_user.id))
else:
if not configGet("sent", file=str(msg.from_user.id)):
if not configGet("confirmed", file=str(msg.from_user.id)):
configSet(["application", str(user_stage)], str(msg.text), file=str(msg.from_user.id))
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {configGet('application', file=str(msg.from_user.id))[question]}")
i += 1
await msg.reply_text(locale("confirm", "message").format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard"), resize_keyboard=True)) # type: ignore
#configSet("sent", True, file=str(msg.from_user.id))
#configSet("application_date", int(time()), file=str(msg.from_user.id))
else:
if not configGet("approved", file=str(msg.from_user.id)) and not configGet("refused", file=str(msg.from_user.id)):
await msg.reply_text(locale("already_sent", "message"))
else:
if not configGet("approved", file=str(msg.from_user.id)) and not configGet("refused", file=str(msg.from_user.id)):
await msg.reply_text(locale("already_sent", "message"))
@app.on_message(~ filters.scheduled & filters.group)
async def message_in_group(app, msg):
if (msg.chat is not None) and (msg.via_bot is not None):
if (msg.via_bot.id == configGet("bot_id")) and (msg.chat.id == configGet("destination_group")):
if configGet("remove_application_time") > 0:
logWrite(f"User {msg.from_user.id} requested application in destination group, removing in {configGet('remove_application_time')} minutes")
await asyncio.sleep(configGet("remove_application_time")*60)
await msg.delete()
logWrite(f"Removed application requested by {msg.from_user.id} in destination group")
# ==============================================================================================================================

View File

@ -0,0 +1,33 @@
from os import sep, path
from app import app, isAnAdmin
from pyrogram.types import ChatPermissions, InlineKeyboardMarkup, InlineKeyboardButton
from modules.utils import configGet, jsonLoad, locale
# Filter users on join =========================================================================================================
@app.on_chat_member_updated(group=configGet("destination_group"))
#@app.on_message(filters.new_chat_members, group=configGet("destination_group"))
async def filter_join(app, member):
if member.invite_link != None:
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json")["approved"]) or (await isAnAdmin(member.from_user.id)):
if configGet("link", file=str(member.from_user.id)) == member.invite_link.invite_link:
return
if await isAnAdmin(member.invite_link.creator.id):
return
await app.send_message(configGet("admin_group"), f"User **{member.from_user.first_name}** (`{member.from_user.id}`) joined the chat not with his personal link", reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(text=str(locale("sus_allow", "button")), callback_data=f"sus_allow_{member.from_user.id}")
],
[
InlineKeyboardButton(text=str(locale("sus_refuse", "button")), callback_data=f"sus_refuse_{member.from_user.id}")
]
]
))
await app.restrict_chat_member(member.chat.id, member.from_user.id, permissions=ChatPermissions(
can_send_messages=False,
can_send_media_messages=False,
can_send_other_messages=False,
can_send_polls=False
)
)
# ==============================================================================================================================

View File

@ -0,0 +1,29 @@
from app import app
from pyrogram import filters
from modules.utils import configGet, configSet, locale, logWrite
# Welcome check ================================================================================================================
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[0][0]) | filters.regex(locale("return", "keyboard")[0][0])))
async def welcome_pass(app, msg, once_again: bool = True) -> None:
"""Set user's stage to 1 and start a fresh application
### Args:
* app (app): Pyrogram Client to use
* msg (Message): Message with .from_user.id attribute equal to the end-user ID whose application will be started
* once_again (bool, optional): Set to False if it's the first time as user applies. Defaults to True.
"""
if not once_again:
await msg.reply_text(locale("privacy_notice", "message"))
logWrite(f"User {msg.from_user.id} confirmed starting the application")
await msg.reply_text(locale("question1", "message"), reply_markup=ForceReply(placeholder=locale("question1", "force_reply"))) # type: ignore
configSet(["stage"], 1, file=str(msg.from_user.id))
configSet(["sent"], False, file=str(msg.from_user.id))
@app.on_message(~ filters.scheduled & filters.private & (filters.regex(locale("welcome", "keyboard")[1][0])))
async def welcome_reject(app, msg):
logWrite(f"User {msg.from_user.id} refused to start the application")
await msg.reply_text(locale("goodbye", "message"), reply_markup=ReplyKeyboardMarkup(locale("return", "keyboard"), resize_keyboard=True)) # type: ignore
# ==============================================================================================================================

View File

@ -1,4 +1,5 @@
from typing import Any, Union
from pyrogram.enums.chat_type import ChatType
from ujson import JSONDecodeError as JSONDecodeError
from ujson import loads, dumps
@ -140,4 +141,7 @@ def killProc(pid):
kill(pid, SIGKILL)
else:
p = Process(pid)
p.kill()
p.kill()
def should_quote(msg):
return True if msg.chat.type is not ChatType.PRIVATE else False