392 lines
21 KiB
Python
392 lines
21 KiB
Python
from datetime import datetime
|
||
from requests import get
|
||
from traceback import print_exc
|
||
from app import app, isAnAdmin
|
||
from typing import Any, List, Literal, Union
|
||
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup
|
||
from pyrogram.errors import bad_request_400
|
||
from dateutil.relativedelta import relativedelta
|
||
from modules.database import col_tmp, col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages
|
||
from modules.logging import logWrite
|
||
from modules.utils import configGet, locale, should_quote
|
||
|
||
class DefaultApplicationTemp(dict):
|
||
def __init__(self, user: int):
|
||
super().__init__({})
|
||
self.dict = {
|
||
"user": user,
|
||
"type": "application",
|
||
"complete": False,
|
||
"sent": False,
|
||
"state": "fill",
|
||
"reapply": False,
|
||
"stage": 1,
|
||
"application": {
|
||
"1": None,
|
||
"2": None,
|
||
"3": None,
|
||
"4": None,
|
||
"5": None,
|
||
"6": None,
|
||
"7": None,
|
||
"8": None,
|
||
"9": None,
|
||
"10": None
|
||
}
|
||
}
|
||
|
||
class UserNotFoundError(Exception):
|
||
"""HoloUser could not find user with such an ID in database"""
|
||
def __init__(self, user, user_id):
|
||
self.user = user
|
||
self.user_id = user_id
|
||
super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found")
|
||
|
||
class UserInvalidError(Exception):
|
||
"""Provided to HoloUser object is not supported"""
|
||
def __init__(self, user):
|
||
self.user = user
|
||
super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type")
|
||
|
||
class HoloUser():
|
||
"""This object represents a user of HoloChecker bot.
|
||
It is primarily used to interact with a database in a more python-friendly way,
|
||
as well as provide better programming experience in case of adding new features, etc.
|
||
"""
|
||
|
||
def __init__(self, user: Union[User, List[User], ChatMember, int]) -> None:
|
||
"""A user of Holo bot. Used to simplify DB interaction.
|
||
|
||
### Args:
|
||
* user (`Union[User, List[User], ChatMember, int]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, etc...
|
||
|
||
### Raises:
|
||
* UserInvalidError: Provided to `HoloUser` object is not supported
|
||
* UserNotFoundError: `HoloUser` could not find user with such an ID in database
|
||
"""
|
||
|
||
# Determine input object class and extract id
|
||
if isinstance(user, list) and len(user) != 0:
|
||
self.id = user[0].id
|
||
elif isinstance(user, User):
|
||
self.id = user.id
|
||
elif isinstance(user, ChatMember):
|
||
self.id = user.user.id
|
||
elif isinstance(user, int):
|
||
self.id = user
|
||
# elif isinstance(user, str):
|
||
# try:
|
||
# get_users = async_to_sync(app.get_users)
|
||
# self.id = get_users(user).id # this line requires testing though
|
||
# except bad_request_400.UsernameNotOccupied:
|
||
# raise UserInvalidError(user)
|
||
# except bad_request_400.PeerIdInvalid:
|
||
# raise UserInvalidError(user)
|
||
else:
|
||
raise UserInvalidError(user)
|
||
|
||
# Find user record in DB
|
||
holo_user = col_users.find_one({"user": self.id})
|
||
|
||
if holo_user is None:
|
||
raise UserNotFoundError(user=user, user_id=self.id)
|
||
|
||
self.db_id = holo_user["_id"]
|
||
|
||
self.link = holo_user["link"]
|
||
self.label = holo_user["label"]
|
||
self.name = holo_user["tg_name"]
|
||
self.phone = holo_user["tg_phone"]
|
||
self.locale = holo_user["tg_locale"]
|
||
self.username = holo_user["tg_username"]
|
||
|
||
if isinstance(user, User) and ((self.name != user.first_name) and (user.first_name is None)):
|
||
self.set("tg_name", user.first_name)
|
||
|
||
if isinstance(user, User) and (self.phone != user.phone_number):
|
||
self.set("tg_phone", user.phone_number)
|
||
|
||
if isinstance(user, User) and ((self.locale != user.language_code) and (user.language_code is not None)):
|
||
self.set("tg_locale", user.language_code)
|
||
|
||
if isinstance(user, User) and (self.username != user.username):
|
||
self.set("tg_username", user.username)
|
||
|
||
def set(self, key: str, value: Any) -> None:
|
||
"""Set attribute data and save it into database
|
||
|
||
### Args:
|
||
* `key` (`str`): Attribute to be changed
|
||
* `value` (`Any`): Value to set
|
||
"""
|
||
if not hasattr(self, key):
|
||
raise AttributeError()
|
||
setattr(self, key, value)
|
||
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True)
|
||
logWrite(f"Set attribute {key} of user {self.id} to {value}")
|
||
|
||
async def message(self,
|
||
context: Message,
|
||
origin: Union[Message, None] = None,
|
||
text: Union[str, None] = None,
|
||
caption: Union[str, None] = None,
|
||
photo: Union[str, Photo, None] = None,
|
||
video: Union[str, Video, None] = None,
|
||
file: Union[str, Document, None] = None,
|
||
animation: Union[str, Animation, None] = None,
|
||
voice: Union[str, Voice, None] = None,
|
||
adm_origin: bool = False,
|
||
adm_context: bool = False
|
||
) -> None:
|
||
"""Send a message to user
|
||
|
||
### Args:
|
||
* context (`Message`): Context (mostly the message where this method is called)
|
||
* origin (`Union[Message, None]`, *optional*): Origin message where to refer. None if called in a command. Defaults to None.
|
||
* text (`Union[str, None]`, *optional*): Text if this is a simple text message. Defaults to None.
|
||
* caption (`Union[str, None]`, *optional*): Text if this is a media message. Defaults to None.
|
||
* photo (`Union[str, Photo, None]`, *optional*): Photo as a photo object or file_id as a string. Defaults to None.
|
||
* video (`Union[str, Video, None]`, *optional*): Video as a video object or file_id as a string. Defaults to None.
|
||
* file (`Union[str, Document, None]`, *optional*): File as a document object or file_id as a string. Defaults to None.
|
||
* animation (`Union[str, Animation, None]`, *optional*): Animation as an animation object or file_id as a string. Defaults to None.
|
||
* voice (`Union[str, Voice, None]`, *optional*): Voice as a voice object or file_id as a string. Defaults to None.
|
||
* adm_origin (`bool`, *optional*): Whether origin sender is an admin. Defaults to False.
|
||
* adm_context (`bool`, *optional*): Whether context sender is an admin. Defaults to False.
|
||
"""
|
||
|
||
# Check if any text available and log message sending
|
||
if text is not None:
|
||
logWrite(f"{context.from_user.id} sent message '{text}' to {self.id}")
|
||
elif caption is not None:
|
||
logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id}")
|
||
else:
|
||
logWrite(f"{context.from_user.id} sent message to {self.id}")
|
||
|
||
# Add notices for admin or user
|
||
if text is not None:
|
||
if adm_context:
|
||
text += locale("message_reply_notice", "message")
|
||
elif adm_origin:
|
||
text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message")
|
||
else:
|
||
text = locale("message_reply_notice", "message")
|
||
|
||
if caption is not None:
|
||
if adm_context:
|
||
caption += locale("message_reply_notice", "message")
|
||
elif adm_origin:
|
||
caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message")
|
||
else:
|
||
caption = locale("message_reply_notice", "message")
|
||
|
||
# Try sending the message
|
||
try:
|
||
|
||
# Check if origin message exists
|
||
# This check decides whether we send_ a message or reply_ to one
|
||
if origin is not None:
|
||
|
||
if photo is not None:
|
||
if isinstance(photo, Photo):
|
||
photo = photo.file_id
|
||
new_message = await origin.reply_photo(photo, caption=caption, quote=True)
|
||
elif video is not None:
|
||
if isinstance(video, Video):
|
||
video = video.file_id
|
||
new_message = await origin.reply_video(video, caption=caption, quote=True)
|
||
elif file is not None:
|
||
if isinstance(file, Document):
|
||
file = file.file_id
|
||
new_message = await origin.reply_document(file, caption=caption, quote=True)
|
||
elif animation is not None:
|
||
if isinstance(animation, Animation):
|
||
animation = animation.file_id
|
||
new_message = await origin.reply_animation(animation, caption=caption, quote=True)
|
||
elif voice is not None:
|
||
if isinstance(voice, Voice):
|
||
voice = voice.file_id
|
||
new_message = await origin.reply_voice(voice, caption=caption, quote=True)
|
||
else:
|
||
new_message = await origin.reply_text(text, quote=True)
|
||
|
||
else:
|
||
|
||
if photo is not None:
|
||
if isinstance(photo, Photo):
|
||
photo = photo.file_id
|
||
new_message = await app.send_photo(self.id, photo, caption=caption)
|
||
elif video is not None:
|
||
if isinstance(video, Video):
|
||
video = video.file_id
|
||
new_message = await app.send_video(self.id, video, caption=caption)
|
||
elif file is not None:
|
||
if isinstance(file, Document):
|
||
file = file.file_id
|
||
new_message = await app.send_document(self.id, file, caption=caption)
|
||
elif animation is not None:
|
||
if isinstance(animation, Animation):
|
||
animation = animation.file_id
|
||
new_message = await app.send_animation(animation, caption=caption, quote=True)
|
||
elif voice is not None:
|
||
if isinstance(voice, Voice):
|
||
voice = voice.file_id
|
||
new_message = await app.send_voice(voice, caption=caption, quote=True)
|
||
else:
|
||
new_message = await app.send_message(self.id, text)
|
||
|
||
# Acknowledge sending a message and save entry into DB
|
||
await context.reply_text(locale("message_sent", "message"), quote=should_quote(context))
|
||
col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}})
|
||
|
||
# Report to admin and to sender about message sending failure
|
||
except Exception as exp:
|
||
logWrite(f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{print_exc()}")
|
||
try:
|
||
await app.send_message(configGet("owner"), locale("message_traceback", "message").format(context.from_user.id, self.id, exp, print_exc()))
|
||
except bad_request_400.PeerIdInvalid:
|
||
logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!")
|
||
await context.reply_text(locale("message_error", "message"), quote=should_quote(context))
|
||
|
||
async def set_label(self, chat: Chat, label: str) -> None:
|
||
"""Set label in destination group
|
||
|
||
### Args:
|
||
* chat (`Chat`): Telegram chat
|
||
* label (`str`): Label you want to set
|
||
"""
|
||
self.label = label
|
||
self.set("label", label)
|
||
await app.promote_chat_member(configGet("destination_group"), self.id)
|
||
if not await isAnAdmin(self.id):
|
||
await app.set_administrator_title(configGet("destination_group"), self.id, label)
|
||
|
||
async def reset_label(self, chat: Chat) -> None:
|
||
"""Reset label in destination group
|
||
|
||
### Args:
|
||
* chat (`Chat`): Telegram chat
|
||
"""
|
||
self.label = ""
|
||
self.set("label", "")
|
||
await app.set_administrator_title(configGet("destination_group"), self.id, "")
|
||
if not await isAnAdmin(self.id):
|
||
await app.promote_chat_member(configGet("destination_group"), self.id, privileges=ChatPrivileges(
|
||
can_manage_chat=False
|
||
))
|
||
|
||
def application_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
|
||
"""Check the current state of application in tmp collection
|
||
|
||
### Returns:
|
||
* `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether application is complete.
|
||
"""
|
||
tmp_application = col_tmp.find_one({"user": self.id, "type": "application"})
|
||
if tmp_application is None:
|
||
return "none", False
|
||
else:
|
||
return tmp_application["state"], tmp_application["complete"]
|
||
|
||
def application_approved(self) -> bool:
|
||
"""Check whether user has a completed application and it got approved
|
||
|
||
### Returns:
|
||
* `bool`: `True` if yes and `False` if no
|
||
"""
|
||
return True if col_applications.find_one({"user": self.id}) is not None else False
|
||
|
||
def application_restart(self) -> None:
|
||
"""Reset application of a user in tmp collection and replace it with an empty one
|
||
"""
|
||
if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
|
||
col_tmp.insert_one(document=DefaultApplicationTemp(self.id).dict)
|
||
else:
|
||
col_tmp.delete_one({"user": self.id, "type": "application"})
|
||
col_tmp.insert_one(document=DefaultApplicationTemp(self.id).dict)
|
||
|
||
async def application_next(self, query: str, msg: Message) -> None:
|
||
"""Move on filling application of user
|
||
|
||
### Args:
|
||
* query (`str`): Some kind of input
|
||
* msg (`Message`): Message that should receive replies
|
||
"""
|
||
|
||
if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
|
||
|
||
col_tmp.insert_one(
|
||
document=DefaultApplicationTemp(self.id).dict
|
||
)
|
||
|
||
progress = col_tmp.find_one({"user": self.id, "type": "application"})
|
||
stage = progress["stage"]
|
||
|
||
if progress["state"] == "fill":
|
||
|
||
if stage == 2:
|
||
|
||
try:
|
||
input_dt = datetime.strptime(query, "%d.%m.%Y")
|
||
except ValueError:
|
||
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
|
||
await msg.reply_text(locale(f"question2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
|
||
return
|
||
|
||
if datetime.now() <= input_dt:
|
||
logWrite(f"User {msg.from_user.id} failed stage {stage} due to joking")
|
||
await msg.reply_text(locale("question2_joke", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale))))
|
||
return
|
||
|
||
elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days):
|
||
logWrite(f"User {msg.from_user.id} failed stage {stage} due to being underage")
|
||
await msg.reply_text(locale("question2_underage", "message", locale=self.locale).format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale))))
|
||
return
|
||
|
||
else:
|
||
print(f'Look: {((datetime.now() - input_dt).days)} > {(datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days}')
|
||
progress["application"][str(stage)] = input_dt
|
||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
|
||
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
|
||
|
||
elif stage == 3:
|
||
try:
|
||
result = (get(f"http://api.geonames.org/searchJSON?q={query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json()
|
||
progress["application"][str(stage)] = result["geonames"][0]
|
||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
|
||
await msg.reply_text(locale(f"question3_found", "message", locale=self.locale).format(result["geonames"][0]["name"], result["geonames"][0]["adminName1"]))
|
||
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
|
||
except (ValueError, KeyError, IndexError):
|
||
await msg.reply_text(locale(f"question3_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
|
||
return
|
||
except Exception as exp:
|
||
await msg.reply_text(locale("question3_error", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
|
||
try:
|
||
await app.send_message(configGet("owner"), locale("question3_traceback", "message", locale=self.locale).format(query, exp, print_exc()))
|
||
except bad_request_400.PeerIdInvalid:
|
||
logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!")
|
||
return
|
||
|
||
elif stage == 10:
|
||
progress["application"][str(stage)] = query
|
||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "complete": True}})
|
||
application_content = []
|
||
i = 1
|
||
for question in progress["application"]:
|
||
if i == 2:
|
||
age = relativedelta(datetime.now(), progress['application']['2'])
|
||
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)")
|
||
elif i == 3:
|
||
if progress['application']['3']['countryCode'] == "UA":
|
||
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})")
|
||
else:
|
||
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})")
|
||
else:
|
||
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application'][question]}")
|
||
i += 1
|
||
await msg.reply_text(locale("confirm", "message", locale=self.locale).format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
|
||
|
||
else:
|
||
progress["application"][str(stage)] = query
|
||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
|
||
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
|
||
|
||
logWrite(f"User {self.id} completed stage {stage} of application") |