Compare commits

...

6 Commits

15 changed files with 133 additions and 91 deletions

View File

@ -54,11 +54,11 @@ class HoloUser():
as well as provide better programming experience in case of adding new features, etc.
"""
def __init__(self, user: Union[User, List[User], ChatMember, int, str]) -> None:
def __init__(self, user: Union[User, List[User], ChatMember, int]) -> None:
"""A user of Holo bot. Used to simplify DB interaction.
### Args:
* user (`Union[User, List[User], ChatMember, int, str]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, username or something...
* user (`Union[User, List[User], ChatMember, int]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, etc...
### Raises:
* UserInvalidError: Provided to `HoloUser` object is not supported
@ -74,13 +74,14 @@ class HoloUser():
self.id = user.user.id
elif isinstance(user, int):
self.id = user
elif isinstance(user, str):
try:
self.id = (app.get_users(user)).id # this line requires testing though
except bad_request_400.UsernameNotOccupied:
raise UserInvalidError(user)
except bad_request_400.PeerIdInvalid:
raise UserInvalidError(user)
# elif isinstance(user, str):
# try:
# get_users = async_to_sync(app.get_users)
# self.id = get_users(user).id # this line requires testing though
# except bad_request_400.UsernameNotOccupied:
# raise UserInvalidError(user)
# except bad_request_400.PeerIdInvalid:
# raise UserInvalidError(user)
else:
raise UserInvalidError(user)
@ -244,7 +245,7 @@ class HoloUser():
self.label = label
self.set("label", label)
await app.promote_chat_member(configGet("destination_group"), self.id)
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
if not await isAnAdmin(self.id):
await app.set_administrator_title(configGet("destination_group"), self.id, label)
async def reset_label(self, chat: Chat) -> None:
@ -256,7 +257,7 @@ class HoloUser():
self.label = ""
self.set("label", "")
await app.set_administrator_title(configGet("destination_group"), self.id, "")
if (not await isAnAdmin(self.id)) and (chat.id == configGet("admin_group")):
if not await isAnAdmin(self.id):
await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges(
can_manage_chat=False
))

View File

@ -42,7 +42,7 @@
"application_status_rejected": "Відхилена `{0}` від {1}",
"application_status_on_hold": "Анкета все ще на розгляді",
"application_status_not_send": "Анкета ще не була відправлена",
"contact_invalid": "Надісланий контакт не має розпочатої анкети.",
"contact_invalid": "Надісланий контакт не має завершеної анкети.",
"contact_not_member": "Надісланий контакт не є користувачем Telegram.",
"already_sent": "Анкету вже надіслано, просто почекай. Тобі одразу повідомлять, яке рішення буде прийнято.",
"sus_joined": "Користувач **{0}** (`{1}`) зайшов до групи не за своїм персональним запрошенням.",
@ -68,6 +68,8 @@
"message_reply_notice": "\n\n**Щоб надіслати відповідь на це повідомлення, тегніть його.**",
"message_error": "⚠️ **Сталась помилка**\nНе вдалось надіслати ваше повідомлення. Розробника повідомлено про цю помилку.",
"message_traceback": "⚠️ **Сталась помилка**\nПомилка повідомлень: `{0}` -> `{1}`\nПомилка: `{2}`\n\nTraceback:\n```\n{3}\n```",
"no_user_application": "Не знайдено користувачів за запитом **{0}**",
"user_invalid": "Надісланий користувач не має завершеної анкети.",
"question_titles": {
"question1": "Ім'я/звертання:",
"question2": "День народження:",

View File

@ -1,24 +1,26 @@
from os import sep
from app import app
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, ChatPermissions
from pyrogram import filters
from modules.utils import configGet, configSet, jsonLoad, jsonSave, locale, logWrite
from classes.holo_user import HoloUser
from modules.utils import configGet, locale, logWrite
from modules.database import col_tmp
# Callbacks sus users ==========================================================================================================
@app.on_callback_query(filters.regex("sus_allow_[\s\S]*"))
async def callback_query_sus_allow(app, clb):
fullclb = clb.data.split("_")
holo_user = HoloUser(int(fullclb[2]))
await app.send_message(configGet("admin_group"), locale("sus_allowed_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True)
logWrite(f"User {fullclb[2]} was allowed to join with another link by {clb.from_user.id}")
await app.send_message(configGet("admin_group"), locale("sus_allowed_by", "message").format(clb.from_user.first_name, holo_user.id), disable_notification=True)
logWrite(f"User {holo_user.id} was allowed to join with another link by {clb.from_user.id}")
edited_markup = [[InlineKeyboardButton(text=str(locale("sus_allowed", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sus_allowed", "callback").format(fullclb[2]), show_alert=True)
await clb.answer(text=locale("sus_allowed", "callback").format(holo_user.id), show_alert=True)
await app.restrict_chat_member(configGet("destination_group"), int(fullclb[2]), permissions=ChatPermissions(
await app.restrict_chat_member(configGet("destination_group"), holo_user.id, permissions=ChatPermissions(
can_send_messages=True,
can_send_media_messages=True,
can_send_other_messages=True,
@ -30,19 +32,17 @@ async def callback_query_sus_allow(app, clb):
async def callback_query_sus_reject(app, clb):
fullclb = clb.data.split("_")
holo_user = HoloUser(int(fullclb[2]))
await app.send_message(configGet("admin_group"), locale("sus_rejected_by", "message").format(clb.from_user.first_name, fullclb[2]), disable_notification=True)
logWrite(f"User {fullclb[2]} was rejected to join with another link by {clb.from_user.id}")
await app.send_message(configGet("admin_group"), locale("sus_rejected_by", "message").format(clb.from_user.first_name, holo_user.id), disable_notification=True)
logWrite(f"User {holo_user.id} was rejected to join with another link by {clb.from_user.id}")
edited_markup = [[InlineKeyboardButton(text=str(locale("sus_rejected", "button")), callback_data="nothing")]]
await clb.message.edit(text=clb.message.text, reply_markup=InlineKeyboardMarkup(edited_markup))
await clb.answer(text=locale("sus_rejected", "callback").format(fullclb[2]), show_alert=True)
await clb.answer(text=locale("sus_rejected", "callback").format(holo_user.id), show_alert=True)
await app.ban_chat_member(configGet("destination_group"), int(fullclb[2]))
await app.ban_chat_member(configGet("destination_group"), holo_user.id)
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}user_default.json"), f"{configGet('data', 'locations')}{sep}users{sep}{fullclb[2]}.json")
configSet(["stage"], 10, file=fullclb[2])
configSet(["rejected"], True, file=fullclb[2])
configSet(["rejected_by"], clb.from_user.id, file=fullclb[2])
col_tmp.update_one({"user": {"$eq": holo_user.id}, "type": {"$eq": "application"}}, {"$set": {"state": "rejected", "sent": False}})
# ==============================================================================================================================

View File

@ -1,55 +1,92 @@
from os import sep, path
from datetime import datetime
from app import app, isAnAdmin
from pyrogram import filters
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from modules.utils import configGet, jsonLoad, logWrite, locale, should_quote
from pyrogram.enums.parse_mode import ParseMode
from pyrogram.errors import bad_request_400
from classes.holo_user import HoloUser
from modules.utils import configGet, logWrite, locale, should_quote
from dateutil.relativedelta import relativedelta
from modules.database import col_applications
# Applications command =========================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["application"], prefixes=["/"]))
async def cmd_application(app, msg):
if (await isAnAdmin(msg.from_user.id)) or (msg.chat.id == configGet("admin_group")):
if await isAnAdmin(msg.from_user.id) is True:
try:
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json")["approved"]):
user_id = int(msg.command[1])
else:
list_of_users = []
async for m in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=msg.command[1]):
list_of_users.append(m)
user_id = list_of_users[0].user.id
try:
user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{user_id}.json")
application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(user_id)]
application_content = []
i = 1
for question in configGet("application", file=str(msg.from_user.id)):
if i == 2:
age = relativedelta(datetime.now(), datetime.strptime(application['application']['2'], '%d.%m.%Y'))
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application']['2']} ({age.years} р.)")
holo_user = HoloUser(int(msg.command[1]))
except ValueError:
try:
holo_user = HoloUser((await app.get_users(msg.command[1])).id)
except bad_request_400.UsernameInvalid:
await msg.reply_text(locale("no_user_application", "message").format(msg.command[1]), quote=should_quote(msg))
application = col_applications.find_one({"user": holo_user.id})
application_content = []
i = 1
if application is None:
logWrite(f"User {msg.from_user.id} requested application of {holo_user.id} but user does not exists")
await msg.reply_text(locale("user_invalid", "message"), quote=should_quote(msg))
return
for question in application['application']:
if i == 2:
age = relativedelta(datetime.now(), application['application']['2'])
application_content.append(f"{locale(f'question{i}', 'message', 'question_titles')} {application['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)")
elif i == 3:
if application['application']['3']['countryCode'] == "UA":
application_content.append(f"{locale(f'question{i}', 'message', 'question_titles')} {application['application']['3']['name']}")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application'][question]}")
i += 1
if user_data["sent"]:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M"))
elif application["rejected"]:
application_status = locale("application_status_rejected", "message").format((await app.get_users(application["rejected_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M"))
else:
application_status = locale("application_status_on_hold", "message")
application_content.append(f"{locale(f'question{i}', 'message', 'question_titles')} {application['application']['3']['name']} ({application['application']['3']['adminName1']}, {application['application']['3']['countryName']})")
else:
if user_data["approved"]:
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M"))
elif application["rejected"]:
application_status = locale("application_status_rejected", "message").format((await app.get_users(application["rejected_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M"))
else:
application_status = locale("application_status_not_send", "message")
logWrite(f"User {msg.from_user.id} requested application of {user_id}")
await msg.reply_text(locale("contact", "message").format(str(user_id), "\n".join(application_content), application_status), quote=should_quote(msg))
except FileNotFoundError:
logWrite(f"User {msg.from_user.id} requested application of {user_id} but user does not exists")
await msg.reply_text(locale("contact_invalid", "message"), quote=should_quote(msg))
application_content.append(f"{locale(f'question{i}', 'message', 'question_titles')} {application['application'][question]}")
i += 1
application_status = locale("application_status_accepted", "message").format((await app.get_users(application["admin"])).first_name, application["date"].strftime("%d.%m.%Y, %H:%M"))
await msg.reply_text(locale("contact", "message").format(holo_user.id, "\n".join(application_content), application_status), parse_mode=ParseMode.MARKDOWN, quote=should_quote(msg))
# if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.command[1]}.json")["approved"]):
# user_id = int(msg.command[1])
# else:
# list_of_users = []
# async for m in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=msg.command[1]):
# list_of_users.append(m)
# user_id = list_of_users[0].user.id
# try:
# user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{user_id}.json")
# application = jsonLoad(f"{configGet('data', 'locations')}{sep}applications.json")[str(user_id)]
# application_content = []
# i = 1
# for question in configGet("application", file=str(msg.from_user.id)):
# if i == 2:
# age = relativedelta(datetime.now(), datetime.strptime(application['application']['2'], '%d.%m.%Y'))
# application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application']['2']} ({age.years} р.)")
# else:
# application_content.append(f"{locale('question'+str(i), 'message', 'question_titles')} {application['application'][question]}")
# i += 1
# if user_data["sent"]:
# if user_data["approved"]:
# application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M"))
# elif application["rejected"]:
# application_status = locale("application_status_rejected", "message").format((await app.get_users(application["rejected_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M"))
# else:
# application_status = locale("application_status_on_hold", "message")
# else:
# if user_data["approved"]:
# application_status = locale("application_status_accepted", "message").format((await app.get_users(application["approved_by"])).first_name, datetime.fromtimestamp(application["approval_date"]).strftime("%d.%m.%Y, %H:%M"))
# elif application["rejected"]:
# application_status = locale("application_status_rejected", "message").format((await app.get_users(application["rejected_by"])).first_name, datetime.fromtimestamp(application["refusal_date"]).strftime("%d.%m.%Y, %H:%M"))
# else:
# application_status = locale("application_status_not_send", "message")
# logWrite(f"User {msg.from_user.id} requested application of {user_id}")
# await msg.reply_text(locale("contact", "message").format(str(user_id), "\n".join(application_content), application_status), quote=should_quote(msg))
except IndexError:
await msg.reply_text(locale("application_invalid_syntax", "message"), quote=should_quote(msg))

View File

@ -10,7 +10,7 @@ from modules.database import col_applications
@app.on_message(~ filters.scheduled & filters.command(["applications"], prefixes=["/"]))
async def cmd_applications(app, msg):
if (await isAnAdmin(msg.from_user.id)) or (msg.chat.id == configGet("admin_group")):
if await isAnAdmin(msg.from_user.id) is True:
await app.send_chat_action(msg.chat.id, ChatAction.UPLOAD_DOCUMENT)
filename = uuid1()
output = []

View File

@ -6,7 +6,7 @@ from classes.holo_user import HoloUser
@app.on_message(~ filters.scheduled & filters.private & filters.command(["label"], prefixes=["/"]))
async def cmd_label(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
if await isAnAdmin(msg.from_user.id) is True:
if len(msg.command) < 3:
await msg.reply_text("Invalid syntax:\n`/label USER LABEL`")

View File

@ -7,7 +7,7 @@ from modules.utils import logWrite, locale, configGet, should_quote
@app.on_message(~ filters.scheduled & filters.command(["message"], prefixes=["/"]))
async def cmd_message(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
if await isAnAdmin(msg.from_user.id) is True:
try:

View File

@ -11,7 +11,7 @@ pid = getpid()
@app.on_message(~ filters.scheduled & filters.private & filters.command(["kill", "die", "reboot"], prefixes=["/"]))
async def cmd_kill(app, msg):
if msg.chat.id == configGet("admin_group") or await isAnAdmin(msg.from_user.id):
if await isAnAdmin(msg.from_user.id) is True:
logWrite(f"Shutting down bot with pid {pid}")
await msg.reply_text(f"Вимкнення бота з підом `{pid}`", quote=should_quote(msg))
scheduler.shutdown()

View File

@ -1,23 +1,23 @@
from datetime import datetime
from os import path, sep
from app import app
from pyrogram import filters
from modules.utils import configGet, jsonLoad, jsonSave
from modules.utils import should_quote
# Sponsorship command ==========================================================================================================
@app.on_message(~ filters.scheduled & filters.command(["sponsorship"], prefixes=["/"]))
async def cmd_sponsorship(app, msg):
if not path.exists(f"{configGet('data', 'locations')}{sep}sponsors{sep}{msg.from_user.id}.json"):
jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}sponsor_default.json"), f"{configGet('data', 'locations')}{sep}sponsors{sep}{msg.from_user.id}.json")
sponsor = jsonLoad(f"{configGet('data', 'locations')}{sep}sponsors{sep}{msg.from_user.id}.json")
if sponsor["approved"]:
if sponsor["expires"] is not None:
if datetime.strptime(sponsor["expires"], "%d.%m.%Y") > datetime.now():
await msg.reply_text(f"You have an active sub til **{sponsor['expires']}**.")
else:
await msg.reply_text(f"Your sub expired {int((datetime.now()-datetime.strptime(sponsor['expires'], '%d.%m.%Y')).days)} days ago.")
elif sponsor["approved"]:
await msg.reply_text(f"Your sub expiration date is not valid.")
else:
await msg.reply_text(f"You have no active subscription.")
await msg.reply_text("Yes, I exist.", quote=should_quote(msg))
# if not path.exists(f"{configGet('data', 'locations')}{sep}sponsors{sep}{msg.from_user.id}.json"):
# jsonSave(jsonLoad(f"{configGet('data', 'locations')}{sep}sponsor_default.json"), f"{configGet('data', 'locations')}{sep}sponsors{sep}{msg.from_user.id}.json")
# sponsor = jsonLoad(f"{configGet('data', 'locations')}{sep}sponsors{sep}{msg.from_user.id}.json")
# if sponsor["approved"]:
# if sponsor["expires"] is not None:
# if datetime.strptime(sponsor["expires"], "%d.%m.%Y") > datetime.now():
# await msg.reply_text(f"You have an active sub til **{sponsor['expires']}**.")
# else:
# await msg.reply_text(f"Your sub expired {int((datetime.now()-datetime.strptime(sponsor['expires'], '%d.%m.%Y')).days)} days ago.")
# elif sponsor["approved"]:
# await msg.reply_text(f"Your sub expiration date is not valid.")
# else:
# await msg.reply_text(f"You have no active subscription.")
# ==============================================================================================================================

View File

@ -10,7 +10,7 @@ async def cmd_warn(app, msg):
if msg.chat.id == configGet("destination_group"):
if msg.reply_to_message_id != None:
if await isAnAdmin(msg.from_user.id):
if await isAnAdmin(msg.from_user.id) is True:
message = " ".join(msg.command[1:]) if len(msg.command) > 1 else ""
col_warnings.insert_one({"user": msg.reply_to_message.from_user.id, "admin": msg.from_user.id, "date": datetime.now(), "reason": message})
if message == "":

View File

@ -8,7 +8,7 @@ from modules.database import col_users, col_warnings
@app.on_message(~ filters.scheduled & filters.command(["warnings"], prefixes=["/"]))
async def cmd_warnings(app, msg):
if await isAnAdmin(msg.from_user.id):
if await isAnAdmin(msg.from_user.id) is True:
if len(msg.command) <= 1:
await msg.reply_text(locale("syntax_warnings", "message"), quote=should_quote(msg))

View File

@ -53,7 +53,8 @@ async def confirm_yes(app, msg):
InlineKeyboardButton(text=str(locale("reapply_no", "button")), callback_data=f"reapply_no_{holo_user.id}")
]
]
)
),
parse_mode=ParseMode.MARKDOWN
)
else:
await app.send_message(chat_id=configGet("admin_group"), text=(locale("application_got", "message")).format(str(holo_user.id), msg.from_user.first_name, msg.from_user.last_name, msg.from_user.username, "\n".join(application_content)), parse_mode=ParseMode.MARKDOWN, reply_markup=InlineKeyboardMarkup(
@ -71,7 +72,8 @@ async def confirm_yes(app, msg):
InlineKeyboardButton(text=str(locale("sub_russian", "button")), callback_data=f"sub_russian_{holo_user.id}")
]
]
)
),
parse_mode=ParseMode.MARKDOWN
)
logWrite(f"User {holo_user.id} sent his application and it will now be reviewed")

View File

@ -8,7 +8,7 @@ from modules.utils import configGet, jsonLoad, locale, logWrite
# Contact getting ==============================================================================================================
@app.on_message(~ filters.scheduled & filters.contact & filters.private)
async def get_contact(app, msg):
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")["approved"]) or (await isAnAdmin(msg.from_user.id)):
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.from_user.id}.json")["approved"]) or (await isAnAdmin(msg.from_user.id) is True):
if msg.contact.user_id != None:
try:
user_data = jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{msg.contact.user_id}.json")

View File

@ -8,7 +8,7 @@ from modules.utils import configGet, jsonLoad, locale
#@app.on_message(filters.new_chat_members, group=configGet("destination_group"))
async def filter_join(app, member):
if member.invite_link != None:
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json")["approved"]) or (await isAnAdmin(member.from_user.id)):
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{member.from_user.id}.json")["approved"]) or (await isAnAdmin(member.from_user.id) is True):
if configGet("link", file=str(member.from_user.id)) == member.invite_link.invite_link:
return
if await isAnAdmin(member.invite_link.creator.id):

View File

@ -25,7 +25,7 @@ async def inline_answer(client, inline_query):
)
return
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{inline_query.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{inline_query.from_user.id}.json")["approved"]) or (await isAnAdmin(inline_query.from_user.id)):
if (path.exists(f"{configGet('data', 'locations')}{sep}users{sep}{inline_query.from_user.id}.json") and jsonLoad(f"{configGet('data', 'locations')}{sep}users{sep}{inline_query.from_user.id}.json")["approved"]) or (await isAnAdmin(inline_query.from_user.id) is True):
list_of_users = []
async for m in app.get_chat_members(configGet("destination_group"), limit=configGet("inline_preview_count"), filter=ChatMembersFilter.SEARCH, query=inline_query.query):