This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/classes/holo_user.py

520 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from datetime import datetime
from asyncio import sleep
from ftfy import fix_text
from traceback import format_exc
from app import app, isAnAdmin
from typing import Any, List, Literal, Union
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup
from pyrogram.errors import bad_request_400
from dateutil.relativedelta import relativedelta
from classes.errors.geo import PlaceNotFoundError
from classes.errors.holo_user import UserInvalidError, UserNotFoundError, LabelTooLongError, LabelSettingError
from classes.templates import DefaultApplicationTemp, DefaultSponsorshipTemp
from modules.database import col_tmp, col_users, col_applications, col_sponsorships, col_messages, col_spoilers
from modules.logging import logWrite
from modules.utils import configGet, find_location, locale, should_quote
class HoloUser():
"""This object represents a user of HoloChecker bot.
It is primarily used to interact with a database in a more python-friendly way,
as well as provide better programming experience in case of adding new features, etc.
"""
def __init__(self, user: Union[User, List[User], ChatMember, int]) -> None:
"""A user of Holo bot. Used to simplify DB interaction.
### Args:
* user (`Union[User, List[User], ChatMember, int]`): Any possible way to identify the user. Pass `User` object, user's telegram ID, etc...
### Raises:
* UserInvalidError: Provided to `HoloUser` object is not supported
* UserNotFoundError: `HoloUser` could not find user with such an ID in database
"""
# Determine input object class and extract id
if isinstance(user, list) and len(user) != 0:
self.id = user[0].id
elif isinstance(user, User):
self.id = user.id
elif isinstance(user, ChatMember):
self.id = user.user.id
elif isinstance(user, int):
self.id = user
# elif isinstance(user, str):
# try:
# get_users = async_to_sync(app.get_users)
# self.id = get_users(user).id # this line requires testing though
# except bad_request_400.UsernameNotOccupied:
# raise UserInvalidError(user)
# except bad_request_400.PeerIdInvalid:
# raise UserInvalidError(user)
else:
raise UserInvalidError(user)
# Find user record in DB
holo_user = col_users.find_one({"user": self.id})
if holo_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
self.db_id = holo_user["_id"]
self.link = holo_user["link"]
self.label = holo_user["label"]
self.name = holo_user["tg_name"]
self.phone = holo_user["tg_phone"]
self.locale = holo_user["tg_locale"]
self.username = holo_user["tg_username"]
if isinstance(user, User):
if (self.name != user.first_name) and hasattr(user, "first_name") and (user.first_name is not None):
self.set("name", user.first_name, db_key="tg_name")
if (self.phone != user.phone_number) and hasattr(user, "phone") and (user.phone_number is not None):
self.set("phone", user.phone_number, db_key="tg_phone")
if (self.locale != user.language_code) and hasattr(user, "locale") and (user.language_code is not None):
self.set("locale", user.language_code, db_key="tg_locale")
if (self.username != user.username) and hasattr(user, "username") and (user.username is not None):
self.set("username", user.username, db_key="tg_username")
def set(self, key: str, value: Any, db_key: Union[str, None] = None) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (`str`): Attribute to be changed
* `value` (`Any`): Value to set
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
db_key = key if db_key is None else db_key
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { db_key: value } }, upsert=True)
logWrite(f"Set attribute {key} of user {self.id} to {value}")
async def message(self,
context: Message,
origin: Union[Message, None] = None,
text: Union[str, None] = None,
caption: Union[str, None] = None,
photo: Union[str, Photo, None] = None,
video: Union[str, Video, None] = None,
file: Union[str, Document, None] = None,
animation: Union[str, Animation, None] = None,
voice: Union[str, Voice, None] = None,
adm_origin: bool = False,
adm_context: bool = False
) -> None:
"""Send a message to user
### Args:
* context (`Message`): Context (mostly the message where this method is called)
* origin (`Union[Message, None]`, *optional*): Origin message where to refer. None if called in a command. Defaults to None.
* text (`Union[str, None]`, *optional*): Text if this is a simple text message. Defaults to None.
* caption (`Union[str, None]`, *optional*): Text if this is a media message. Defaults to None.
* photo (`Union[str, Photo, None]`, *optional*): Photo as a photo object or file_id as a string. Defaults to None.
* video (`Union[str, Video, None]`, *optional*): Video as a video object or file_id as a string. Defaults to None.
* file (`Union[str, Document, None]`, *optional*): File as a document object or file_id as a string. Defaults to None.
* animation (`Union[str, Animation, None]`, *optional*): Animation as an animation object or file_id as a string. Defaults to None.
* voice (`Union[str, Voice, None]`, *optional*): Voice as a voice object or file_id as a string. Defaults to None.
* adm_origin (`bool`, *optional*): Whether origin sender is an admin. Defaults to False.
* adm_context (`bool`, *optional*): Whether context sender is an admin. Defaults to False.
"""
if text is not None:
text = fix_text(text)
elif caption is not None:
caption = fix_text(caption)
# Check if any text available and log message sending
if text is not None:
logWrite(f"{context.from_user.id} sent message '{text}' to {self.id} (source message: {context.id})")
elif caption is not None:
logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id} (source message: {context.id})")
else:
logWrite(f"{context.from_user.id} sent message to {self.id} (source message: {context.id})")
# Add notices for admin or user
if text is not None:
if adm_context:
text += locale("message_reply_notice", "message")
elif adm_origin:
text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message")
else:
text = locale("message_reply_notice", "message")
if caption is not None:
if adm_context:
caption += locale("message_reply_notice", "message")
elif adm_origin:
caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message")
else:
caption = locale("message_reply_notice", "message")
# Try sending the message
try:
# Check if origin message exists
# This check decides whether we send_ a message or reply_ to one
if origin is not None:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await origin.reply_cached_media(photo, caption=caption, quote=True)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await origin.reply_cached_media(video, caption=caption, quote=True)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await origin.reply_cached_media(file, caption=caption, quote=True)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await origin.reply_cached_media(animation, caption=caption, quote=True)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await origin.reply_voice(voice, caption=caption, quote=True)
else:
new_message = await origin.reply_text(text, quote=True)
else:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await app.send_cached_media(self.id, photo, caption=caption)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await app.send_cached_media(self.id, video, caption=caption)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await app.send_cached_media(self.id, file, caption=caption)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await app.send_cached_media(self.id, animation, caption=caption)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await app.send_cached_media(self.id, voice, caption=caption)
else:
new_message = await app.send_message(self.id, text)
# Acknowledge sending a message and save entry into DB
await context.reply_text(locale("message_sent", "message"), quote=should_quote(context))
col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}})
# Report to admin and to sender about message sending failure
except Exception as exp:
logWrite(f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{format_exc()}")
try:
await app.send_message(configGet("owner"), locale("message_traceback", "message").format(context.from_user.id, self.id, exp, format_exc()))
except bad_request_400.PeerIdInvalid:
logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!")
await context.reply_text(locale("message_error", "message"), quote=should_quote(context))
async def label_set(self, chat: Chat, label: str) -> None:
"""Set label in destination group
### Args:
* chat (`Chat`): Telegram chat
* label (`str`): Label you want to set
"""
if len(label) > 16:
raise LabelTooLongError(label)
self.label = label
try:
await app.promote_chat_member(configGet("users", "groups"), self.id, privileges=ChatPrivileges(can_pin_messages=True, can_manage_video_chats=True))
if not await isAnAdmin(self.id):
await sleep(0.5)
await app.set_administrator_title(configGet("users", "groups"), self.id, label)
self.set("label", label)
except Exception as exp:
logWrite(f"Could not set {self.id}'s title to '{self.label}' due to {exp}")
raise LabelSettingError(exp, format_exc())
async def label_reset(self, chat: Chat) -> None:
"""Reset label in destination group
### Args:
* chat (`Chat`): Telegram chat
"""
self.label = ""
self.set("label", "")
await app.set_administrator_title(configGet("users", "groups"), self.id, "")
if not await isAnAdmin(self.id):
await app.promote_chat_member(configGet("users", "groups"), self.id, privileges=ChatPrivileges(
can_manage_chat=False,
can_pin_messages=False,
can_manage_video_chats=False
))
def application_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
"""Check the current state of application in tmp collection
### Returns:
* `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether application is complete.
"""
tmp_application = col_tmp.find_one({"user": self.id, "type": "application"})
if tmp_application is None:
return "none", False
else:
return tmp_application["state"], tmp_application["complete"]
def application_approved(self) -> bool:
"""Check whether user has a completed application and it got approved
### Returns:
* `bool`: `True` if yes and `False` if no
"""
return True if col_applications.find_one({"user": self.id}) is not None else False
def application_restart(self, reapply: bool = False) -> None:
"""Reset application of a user in tmp collection and replace it with an empty one
"""
if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
col_tmp.insert_one(document=DefaultApplicationTemp(self.id, reapply=reapply).dict)
else:
col_tmp.find_one_and_replace({"user": self.id, "type": "application"}, DefaultApplicationTemp(self.id, reapply=reapply).dict)
async def application_next(self, query: str, msg: Message) -> None:
"""Move on filling application of user
### Args:
* query (`str`): Some kind of input
* msg (`Message`): Message that should receive replies
"""
# if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
if self.sponsorship_state()[0] == "fill":
return
if self.spoiler_state() is True:
return
# col_tmp.insert_one(
# document=DefaultApplicationTemp(self.id).dict
# )
progress = col_tmp.find_one({"user": self.id, "type": "application"})
if progress is None:
return
stage = progress["stage"]
# if self.sponsorship_state()[0] == "fill":
# await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg))
# return
if progress["state"] == "fill" and progress["sent"] is False:
if msg.text is not None:
msg.text = fix_text(msg.text)
if stage == 2:
try:
input_dt = datetime.strptime(query, "%d.%m.%Y")
except ValueError:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
await msg.reply_text(locale(f"question2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
return
if (datetime.now() <= input_dt) or ((datetime.now() - input_dt).days) > ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_maximum"))).days):
logWrite(f"User {msg.from_user.id} failed stage {stage} due to joking")
await msg.reply_text(locale("question2_joke", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale))))
return
elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days):
logWrite(f"User {msg.from_user.id} failed stage {stage} due to being underage")
await msg.reply_text(locale("question2_underage", "message", locale=self.locale).format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale))))
return
else:
progress["application"][str(stage)] = input_dt
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
elif stage == 3:
try:
progress["application"][str(stage)] = find_location(query)
if ("lat" in progress["application"][str(stage)] and "lng" in progress["application"][str(stage)]):
progress["application"][str(stage)]["location"] = [float(progress["application"][str(stage)]["lng"]), float(progress["application"][str(stage)]["lat"])]
del progress["application"][str(stage)]["lat"]
del progress["application"][str(stage)]["lng"]
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
await msg.reply_text(locale("question3_found", "message", locale=self.locale).format(progress["application"][str(stage)]["name"], progress["application"][str(stage)]["adminName1"]))
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
except PlaceNotFoundError:
await msg.reply_text(locale("question3_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
return
except Exception as exp:
await msg.reply_text(locale("question3_error", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
try:
await app.send_message(configGet("owner"), locale("question3_traceback", "message", locale=self.locale).format(query, exp, format_exc()))
except bad_request_400.PeerIdInvalid:
logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!")
return
elif stage == 10:
if len(query) > 1024:
await msg.reply_text(locale("question10_too_long", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
return
progress["application"][str(stage)] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "complete": True}})
application_content = []
i = 1
for question in progress["application"]:
if i == 2:
age = relativedelta(datetime.now(), progress['application']['2'])
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)")
elif i == 3:
if progress['application']['3']['countryCode'] == "UA":
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})")
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application'][question]}")
i += 1
await msg.reply_text(locale("confirm", "message", locale=self.locale).format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
else:
if len(query) > 256:
await msg.reply_text(locale("question_too_long", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
return
progress["application"][str(stage)] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
logWrite(f"User {self.id} completed stage {stage} of application")
else:
return
def sponsorship_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
"""Check the current state of sponsorship in tmp collection
### Returns:
* `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether sponsorship application is complete.
"""
tmp_sponsorship = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
if tmp_sponsorship is None:
return "none", False
else:
return tmp_sponsorship["state"], tmp_sponsorship["complete"]
def sponsorship_valid(self) -> bool:
"""Check whether user has a valid sponsorship
### Returns:
* `bool`: `True` if yes and `False` if no
"""
return True if col_sponsorships.find_one({"user": self.id, "expires": {"$gt": datetime.now()}}) is not None else False
def sponsorship_restart(self) -> None:
"""Reset sponsorship of a user in tmp collection and replace it with an empty one
"""
if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is None:
col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict)
else:
col_tmp.delete_one({"user": self.id, "type": "sponsorship"})
col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict)
async def sponsorship_next(self, query: str, msg: Message, photo: Union[Photo, None] = None) -> None:
"""Move on filling sponsorship of user
### Args:
* query (`str`): Some kind of input
* msg (`Message`): Message that should receive replies
"""
progress = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
if progress is not None:
stage = progress["stage"]
if msg.text is not None:
msg.text = fix_text(msg.text)
elif msg.caption is not None:
msg.caption = fix_text(msg.caption)
if progress["state"] == "fill" and progress["sent"] is False:
if stage == 1:
if len(query) > 240:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
await msg.reply_text(locale(f"sponsor1_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale))))
return
progress["sponsorship"]["streamer"] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
elif stage == 2:
try:
input_dt = datetime.strptime(query, "%d.%m.%Y")
except ValueError:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
await msg.reply_text(locale(f"sponsor2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale))))
return
if datetime.now() >= input_dt:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending date in the past")
await msg.reply_text(locale("sponsor2_past", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("sponsor2", "force_reply", locale=self.locale))))
return
else:
progress["sponsorship"]["expires"] = input_dt
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
elif stage == 3:
if photo is not None:
progress["sponsorship"]["proof"] = photo.file_id
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
elif stage == 4:
if len(query) > 16:
await msg.reply_text(locale("label_too_long", "message"), reply_markup=ForceReply(placeholder=str(locale("sponsor4", "force_reply", locale=self.locale))))
return
progress["sponsorship"]["label"] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "complete": True}})
await msg.reply_cached_media(
progress["sponsorship"]["proof"],
caption=locale("sponsor_confirm", "message", locale=self.locale).format(
progress["sponsorship"]["streamer"],
progress["sponsorship"]["expires"].strftime("%d.%m.%Y"),
progress["sponsorship"]["label"]
),
reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
else:
return
logWrite(f"User {self.id} completed stage {stage} of sponsorship")
else:
return
def spoiler_state(self) -> bool:
"""Check if user has any started but not finished spoilers
### Returns:
* `bool`: `True` if any not finished spoilers available and `False` if none.
"""
return False if col_spoilers.find_one({"user": self.id, "completed": False}) is None else True