Changed code style to black

This commit is contained in:
2023-03-09 16:25:06 +01:00
parent 5b8951fe07
commit 3304339244
44 changed files with 3243 additions and 936 deletions

View File

@@ -1,5 +1,8 @@
class PlaceNotFoundError(Exception):
"""Query provided did not lead to any city or populated area"""
"""Query provided did not lead to any city or populated area"""
def __init__(self, query):
self.query = query
super().__init__(f"Could not find any place on geonames.org of feature classes A and P by query '{self.query}'")
super().__init__(
f"Could not find any place on geonames.org of feature classes A and P by query '{self.query}'"
)

View File

@@ -1,24 +1,38 @@
"""Exceptions that are meant to be used by HoloUser class
and other modules that handle those exceptions"""
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found")
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)
class UserInvalidError(Exception):
"""Provided to HoloUser object is not supported"""
"""Provided to HoloUser object is not supported"""
def __init__(self, user):
self.user = user
super().__init__(f"Could not find HoloUser by using {type(self.user)} as an input type")
super().__init__(
f"Could not find HoloUser by using {type(self.user)} as an input type"
)
class LabelTooLongError(Exception):
def __init__(self, label: str) -> None:
self.label = label
super().__init__(f"Could not set label to '{label}' because it is {len(label)} characters long (16 is maximum)")
super().__init__(
f"Could not set label to '{label}' because it is {len(label)} characters long (16 is maximum)"
)
class LabelSettingError(Exception):
def __init__(self, exp: Exception, trace: str) -> None:
super().__init__(f"❌ **Could not set label**\n\nException: `{exp}`\n\n**Traceback:**\n```\n{trace}\n```")
super().__init__(
f"❌ **Could not set label**\n\nException: `{exp}`\n\n**Traceback:**\n```\n{trace}\n```"
)

View File

@@ -4,21 +4,47 @@ from ftfy import fix_text
from traceback import format_exc
from app import app, isAnAdmin
from typing import Any, List, Literal, Union
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup
from pyrogram.types import (
User,
ChatMember,
ChatPrivileges,
Chat,
Message,
Photo,
Video,
Document,
Animation,
Voice,
ForceReply,
ReplyKeyboardMarkup,
)
from pyrogram.errors import bad_request_400
from dateutil.relativedelta import relativedelta
from classes.errors.geo import PlaceNotFoundError
from classes.errors.holo_user import UserInvalidError, UserNotFoundError, LabelTooLongError, LabelSettingError
from classes.errors.holo_user import (
UserInvalidError,
UserNotFoundError,
LabelTooLongError,
LabelSettingError,
)
from classes.templates import DefaultApplicationTemp, DefaultSponsorshipTemp
from modules.database import col_tmp, col_users, col_applications, col_sponsorships, col_messages, col_spoilers
from modules.database import (
col_tmp,
col_users,
col_applications,
col_sponsorships,
col_messages,
col_spoilers,
)
from modules.logging import logWrite
from modules.utils import configGet, find_location, locale, should_quote
class HoloUser():
class HoloUser:
"""This object represents a user of HoloChecker bot.
It is primarily used to interact with a database in a more python-friendly way,
as well as provide better programming experience in case of adding new features, etc.
"""
"""
def __init__(self, user: Union[User, List[User], ChatMember, int]) -> None:
"""A user of Holo bot. Used to simplify DB interaction.
@@ -29,7 +55,7 @@ class HoloUser():
### Raises:
* UserInvalidError: Provided to `HoloUser` object is not supported
* UserNotFoundError: `HoloUser` could not find user with such an ID in database
"""
"""
# Determine input object class and extract id
if isinstance(user, list) and len(user) != 0:
@@ -67,34 +93,52 @@ class HoloUser():
self.username = holo_user["tg_username"]
if isinstance(user, User):
if (self.name != user.first_name) and hasattr(user, "first_name") and (user.first_name is not None):
if (
(self.name != user.first_name)
and hasattr(user, "first_name")
and (user.first_name is not None)
):
self.set("name", user.first_name, db_key="tg_name")
if (self.phone != user.phone_number) and hasattr(user, "phone") and (user.phone_number is not None):
if (
(self.phone != user.phone_number)
and hasattr(user, "phone")
and (user.phone_number is not None)
):
self.set("phone", user.phone_number, db_key="tg_phone")
if (self.locale != user.language_code) and hasattr(user, "locale") and (user.language_code is not None):
if (
(self.locale != user.language_code)
and hasattr(user, "locale")
and (user.language_code is not None)
):
self.set("locale", user.language_code, db_key="tg_locale")
if (self.username != user.username) and hasattr(user, "username") and (user.username is not None):
if (
(self.username != user.username)
and hasattr(user, "username")
and (user.username is not None)
):
self.set("username", user.username, db_key="tg_username")
def set(self, key: str, value: Any, db_key: Union[str, None] = None) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (`str`): Attribute to be changed
* `value` (`Any`): Value to set
"""
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
db_key = key if db_key is None else db_key
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { db_key: value } }, upsert=True)
col_users.update_one(
filter={"_id": self.db_id}, update={"$set": {db_key: value}}, upsert=True
)
logWrite(f"Set attribute {key} of user {self.id} to {value}")
async def message(self,
async def message(
self,
context: Message,
origin: Union[Message, None] = None,
text: Union[str, None] = None,
@@ -105,7 +149,7 @@ class HoloUser():
animation: Union[str, Animation, None] = None,
voice: Union[str, Voice, None] = None,
adm_origin: bool = False,
adm_context: bool = False
adm_context: bool = False,
) -> None:
"""Send a message to user
@@ -121,7 +165,7 @@ class HoloUser():
* voice (`Union[str, Voice, None]`, *optional*): Voice as a voice object or file_id as a string. Defaults to None.
* adm_origin (`bool`, *optional*): Whether origin sender is an admin. Defaults to False.
* adm_context (`bool`, *optional*): Whether context sender is an admin. Defaults to False.
"""
"""
if text is not None:
text = fix_text(text)
@@ -130,18 +174,30 @@ class HoloUser():
# Check if any text available and log message sending
if text is not None:
logWrite(f"{context.from_user.id} sent message '{text}' to {self.id} (source message: {context.id})")
logWrite(
f"{context.from_user.id} sent message '{text}' to {self.id} (source message: {context.id})"
)
elif caption is not None:
logWrite(f"{context.from_user.id} sent message '{caption}' to {self.id} (source message: {context.id})")
logWrite(
f"{context.from_user.id} sent message '{caption}' to {self.id} (source message: {context.id})"
)
else:
logWrite(f"{context.from_user.id} sent message to {self.id} (source message: {context.id})")
logWrite(
f"{context.from_user.id} sent message to {self.id} (source message: {context.id})"
)
# Add notices for admin or user
if text is not None:
if adm_context:
text += locale("message_reply_notice", "message")
elif adm_origin:
text = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + text + locale("message_reply_notice", "message")
text = (
locale("message_from", "message").format(
context.from_user.first_name, context.from_user.id
)
+ text
+ locale("message_reply_notice", "message")
)
else:
text = locale("message_reply_notice", "message")
@@ -149,77 +205,118 @@ class HoloUser():
if adm_context:
caption += locale("message_reply_notice", "message")
elif adm_origin:
caption = locale("message_from", "message").format(context.from_user.first_name, context.from_user.id) + caption + locale("message_reply_notice", "message")
caption = (
locale("message_from", "message").format(
context.from_user.first_name, context.from_user.id
)
+ caption
+ locale("message_reply_notice", "message")
)
else:
caption = locale("message_reply_notice", "message")
# Try sending the message
try:
# Check if origin message exists
# This check decides whether we send_ a message or reply_ to one
if origin is not None:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await origin.reply_cached_media(photo, caption=caption, quote=True)
new_message = await origin.reply_cached_media(
photo, caption=caption, quote=True
)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await origin.reply_cached_media(video, caption=caption, quote=True)
new_message = await origin.reply_cached_media(
video, caption=caption, quote=True
)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await origin.reply_cached_media(file, caption=caption, quote=True)
new_message = await origin.reply_cached_media(
file, caption=caption, quote=True
)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await origin.reply_cached_media(animation, caption=caption, quote=True)
new_message = await origin.reply_cached_media(
animation, caption=caption, quote=True
)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await origin.reply_voice(voice, caption=caption, quote=True)
new_message = await origin.reply_voice(
voice, caption=caption, quote=True
)
else:
new_message = await origin.reply_text(text, quote=True)
else:
if photo is not None:
if isinstance(photo, Photo):
photo = photo.file_id
new_message = await app.send_cached_media(self.id, photo, caption=caption)
new_message = await app.send_cached_media(
self.id, photo, caption=caption
)
elif video is not None:
if isinstance(video, Video):
video = video.file_id
new_message = await app.send_cached_media(self.id, video, caption=caption)
new_message = await app.send_cached_media(
self.id, video, caption=caption
)
elif file is not None:
if isinstance(file, Document):
file = file.file_id
new_message = await app.send_cached_media(self.id, file, caption=caption)
new_message = await app.send_cached_media(
self.id, file, caption=caption
)
elif animation is not None:
if isinstance(animation, Animation):
animation = animation.file_id
new_message = await app.send_cached_media(self.id, animation, caption=caption)
new_message = await app.send_cached_media(
self.id, animation, caption=caption
)
elif voice is not None:
if isinstance(voice, Voice):
voice = voice.file_id
new_message = await app.send_cached_media(self.id, voice, caption=caption)
new_message = await app.send_cached_media(
self.id, voice, caption=caption
)
else:
new_message = await app.send_message(self.id, text)
# Acknowledge sending a message and save entry into DB
await context.reply_text(locale("message_sent", "message"), quote=should_quote(context))
col_messages.insert_one({"origin": {"chat": context.chat.id, "id": context.id}, "destination": {"chat": new_message.chat.id, "id": new_message.id}})
await context.reply_text(
locale("message_sent", "message"), quote=should_quote(context)
)
col_messages.insert_one(
{
"origin": {"chat": context.chat.id, "id": context.id},
"destination": {"chat": new_message.chat.id, "id": new_message.id},
}
)
# Report to admin and to sender about message sending failure
except Exception as exp:
logWrite(f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{format_exc()}")
logWrite(
f"Exception {exp} happened as {context.from_user.id} tried to send message to {self.id}. Traceback:\n{format_exc()}"
)
try:
await app.send_message(configGet("owner"), locale("message_traceback", "message").format(context.from_user.id, self.id, exp, format_exc()))
await app.send_message(
configGet("owner"),
locale("message_traceback", "message").format(
context.from_user.id, self.id, exp, format_exc()
),
)
except bad_request_400.PeerIdInvalid:
logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!")
await context.reply_text(locale("message_error", "message"), quote=should_quote(context))
logWrite(
f"Could not notify admin about failure when sending message! Admin has never interacted with bot!"
)
await context.reply_text(
locale("message_error", "message"), quote=should_quote(context)
)
async def label_set(self, chat: Chat, label: str) -> None:
"""Set label in destination group
@@ -227,15 +324,23 @@ class HoloUser():
### Args:
* chat (`Chat`): Telegram chat
* label (`str`): Label you want to set
"""
"""
if len(label) > 16:
raise LabelTooLongError(label)
self.label = label
try:
await app.promote_chat_member(configGet("users", "groups"), self.id, privileges=ChatPrivileges(can_pin_messages=True, can_manage_video_chats=True))
await app.promote_chat_member(
configGet("users", "groups"),
self.id,
privileges=ChatPrivileges(
can_pin_messages=True, can_manage_video_chats=True
),
)
if not await isAnAdmin(self.id):
await sleep(0.5)
await app.set_administrator_title(configGet("users", "groups"), self.id, label)
await app.set_administrator_title(
configGet("users", "groups"), self.id, label
)
self.set("label", label)
except Exception as exp:
logWrite(f"Could not set {self.id}'s title to '{self.label}' due to {exp}")
@@ -246,23 +351,29 @@ class HoloUser():
### Args:
* chat (`Chat`): Telegram chat
"""
"""
self.label = ""
self.set("label", "")
await app.set_administrator_title(configGet("users", "groups"), self.id, "")
if not await isAnAdmin(self.id):
await app.promote_chat_member(configGet("users", "groups"), self.id, privileges=ChatPrivileges(
can_manage_chat=False,
can_pin_messages=False,
can_manage_video_chats=False
))
await app.promote_chat_member(
configGet("users", "groups"),
self.id,
privileges=ChatPrivileges(
can_manage_chat=False,
can_pin_messages=False,
can_manage_video_chats=False,
),
)
def application_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
def application_state(
self,
) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
"""Check the current state of application in tmp collection
### Returns:
* `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether application is complete.
"""
"""
tmp_application = col_tmp.find_one({"user": self.id, "type": "application"})
if tmp_application is None:
return "none", False
@@ -274,16 +385,22 @@ class HoloUser():
### Returns:
* `bool`: `True` if yes and `False` if no
"""
return True if col_applications.find_one({"user": self.id}) is not None else False
"""
return (
True if col_applications.find_one({"user": self.id}) is not None else False
)
def application_restart(self, reapply: bool = False) -> None:
"""Reset application of a user in tmp collection and replace it with an empty one
"""
"""Reset application of a user in tmp collection and replace it with an empty one"""
if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
col_tmp.insert_one(document=DefaultApplicationTemp(self.id, reapply=reapply).dict)
col_tmp.insert_one(
document=DefaultApplicationTemp(self.id, reapply=reapply).dict
)
else:
col_tmp.find_one_and_replace({"user": self.id, "type": "application"}, DefaultApplicationTemp(self.id, reapply=reapply).dict)
col_tmp.find_one_and_replace(
{"user": self.id, "type": "application"},
DefaultApplicationTemp(self.id, reapply=reapply).dict,
)
async def application_next(self, query: str, msg: Message) -> None:
"""Move on filling application of user
@@ -291,7 +408,7 @@ class HoloUser():
### Args:
* query (`str`): Some kind of input
* msg (`Message`): Message that should receive replies
"""
"""
# if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
@@ -311,102 +428,294 @@ class HoloUser():
return
stage = progress["stage"]
# if self.sponsorship_state()[0] == "fill":
# await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg))
# return
if progress["state"] == "fill" and progress["sent"] is False:
if msg.text is not None:
msg.text = fix_text(str(msg.text))
if stage == 2:
try:
input_dt = datetime.strptime(query, "%d.%m.%Y")
except ValueError:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
await msg.reply_text(locale(f"question2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
logWrite(
f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format"
)
await msg.reply_text(
locale(f"question2_invalid", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
return
if (datetime.now() <= input_dt) or ((datetime.now() - input_dt).days) > ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_maximum"))).days):
logWrite(f"User {msg.from_user.id} failed stage {stage} due to joking")
await msg.reply_text(locale("question2_joke", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale))))
if (datetime.now() <= input_dt) or (
(datetime.now() - input_dt).days
) > (
(
datetime.now()
- datetime.now().replace(
year=datetime.now().year - configGet("age_maximum")
)
).days
):
logWrite(
f"User {msg.from_user.id} failed stage {stage} due to joking"
)
await msg.reply_text(
locale("question2_joke", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale("question2", "force_reply", locale=self.locale)
)
),
)
return
elif ((datetime.now() - input_dt).days) < ((datetime.now() - datetime.now().replace(year=datetime.now().year - configGet("age_allowed"))).days):
logWrite(f"User {msg.from_user.id} failed stage {stage} due to being underage")
await msg.reply_text(locale("question2_underage", "message", locale=self.locale).format(str(configGet("age_allowed"))), reply_markup=ForceReply(placeholder=str(locale("question2", "force_reply", locale=self.locale))))
elif ((datetime.now() - input_dt).days) < (
(
datetime.now()
- datetime.now().replace(
year=datetime.now().year - configGet("age_allowed")
)
).days
):
logWrite(
f"User {msg.from_user.id} failed stage {stage} due to being underage"
)
await msg.reply_text(
locale(
"question2_underage", "message", locale=self.locale
).format(str(configGet("age_allowed"))),
reply_markup=ForceReply(
placeholder=str(
locale("question2", "force_reply", locale=self.locale)
)
),
)
return
else:
progress["application"][str(stage)] = input_dt
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "application"}},
{
"$set": {
"application": progress["application"],
"stage": progress["stage"] + 1,
}
},
)
await msg.reply_text(
locale(f"question{stage+1}", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage+1}",
"force_reply",
locale=self.locale,
)
)
),
)
elif stage == 3:
try:
progress["application"][str(stage)] = find_location(query)
if ("lat" in progress["application"][str(stage)] and "lng" in progress["application"][str(stage)]):
progress["application"][str(stage)]["location"] = [float(progress["application"][str(stage)]["lng"]), float(progress["application"][str(stage)]["lat"])]
if (
"lat" in progress["application"][str(stage)]
and "lng" in progress["application"][str(stage)]
):
progress["application"][str(stage)]["location"] = [
float(progress["application"][str(stage)]["lng"]),
float(progress["application"][str(stage)]["lat"]),
]
del progress["application"][str(stage)]["lat"]
del progress["application"][str(stage)]["lng"]
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
await msg.reply_text(locale("question3_found", "message", locale=self.locale).format(progress["application"][str(stage)]["name"], progress["application"][str(stage)]["adminName1"]))
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "application"}},
{
"$set": {
"application": progress["application"],
"stage": progress["stage"] + 1,
}
},
)
await msg.reply_text(
locale("question3_found", "message", locale=self.locale).format(
progress["application"][str(stage)]["name"],
progress["application"][str(stage)]["adminName1"],
)
)
await msg.reply_text(
locale(f"question{stage+1}", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage+1}",
"force_reply",
locale=self.locale,
)
)
),
)
except PlaceNotFoundError:
await msg.reply_text(locale("question3_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
await msg.reply_text(
locale("question3_invalid", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
return
except Exception as exp:
await msg.reply_text(locale("question3_error", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
await msg.reply_text(
locale("question3_error", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
try:
await app.send_message(configGet("owner"), locale("question3_traceback", "message", locale=self.locale).format(query, exp, format_exc()))
await app.send_message(
configGet("owner"),
locale(
"question3_traceback", "message", locale=self.locale
).format(query, exp, format_exc()),
)
except bad_request_400.PeerIdInvalid:
logWrite(f"Could not notify admin about failure when sending message! Admin has never interacted with bot!")
logWrite(
f"Could not notify admin about failure when sending message! Admin has never interacted with bot!"
)
return
elif stage == 10:
if len(query) > 1024:
await msg.reply_text(locale("question10_too_long", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
await msg.reply_text(
locale("question10_too_long", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
return
progress["application"][str(stage)] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "complete": True}})
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "application"}},
{
"$set": {
"application": progress["application"],
"complete": True,
}
},
)
application_content = []
i = 1
for question in progress["application"]:
if i == 2:
age = relativedelta(datetime.now(), progress['application']['2'])
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)")
age = relativedelta(
datetime.now(), progress["application"]["2"]
)
application_content.append(
f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)"
)
elif i == 3:
if progress['application']['3']['countryCode'] == "UA":
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})")
if progress["application"]["3"]["countryCode"] == "UA":
application_content.append(
f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']})"
)
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})")
application_content.append(
f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application']['3']['name']} ({progress['application']['3']['adminName1']}, {progress['application']['3']['countryName']})"
)
else:
application_content.append(f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application'][question]}")
application_content.append(
f"{locale('question'+str(i), 'message', 'question_titles', locale=self.locale)} {progress['application'][question]}"
)
i += 1
await msg.reply_text(locale("confirm", "message", locale=self.locale).format("\n".join(application_content)), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
await msg.reply_text(
locale("confirm", "message", locale=self.locale).format(
"\n".join(application_content)
),
reply_markup=ReplyKeyboardMarkup(
locale("confirm", "keyboard", locale=self.locale),
resize_keyboard=True,
),
)
else:
if len(query) > 256:
await msg.reply_text(locale("question_too_long", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
await msg.reply_text(
locale("question_too_long", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
return
progress["application"][str(stage)] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "application"}},
{
"$set": {
"application": progress["application"],
"stage": progress["stage"] + 1,
}
},
)
await msg.reply_text(
locale(f"question{stage+1}", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"question{stage+1}", "force_reply", locale=self.locale
)
)
),
)
logWrite(f"User {self.id} completed stage {stage} of application")
else:
return
def sponsorship_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
def sponsorship_state(
self,
) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
"""Check the current state of sponsorship in tmp collection
### Returns:
* `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether sponsorship application is complete.
"""
"""
tmp_sponsorship = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
if tmp_sponsorship is None:
return "none", False
@@ -418,30 +727,37 @@ class HoloUser():
### Returns:
* `bool`: `True` if yes and `False` if no
"""
return True if col_sponsorships.find_one({"user": self.id, "expires": {"$gt": datetime.now()}}) is not None else False
"""
return (
True
if col_sponsorships.find_one(
{"user": self.id, "expires": {"$gt": datetime.now()}}
)
is not None
else False
)
def sponsorship_restart(self) -> None:
"""Reset sponsorship of a user in tmp collection and replace it with an empty one
"""
"""Reset sponsorship of a user in tmp collection and replace it with an empty one"""
if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is None:
col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict)
else:
col_tmp.delete_one({"user": self.id, "type": "sponsorship"})
col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict)
async def sponsorship_next(self, query: str, msg: Message, photo: Union[Photo, None] = None) -> None:
async def sponsorship_next(
self, query: str, msg: Message, photo: Union[Photo, None] = None
) -> None:
"""Move on filling sponsorship of user
### Args:
* query (`str`): Some kind of input
* msg (`Message`): Message that should receive replies
"""
"""
progress = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
if progress is not None:
stage = progress["stage"]
if msg.text is not None:
@@ -450,58 +766,171 @@ class HoloUser():
msg.caption = fix_text(msg.caption)
if progress["state"] == "fill" and progress["sent"] is False:
if stage == 1:
if len(query) > 240:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
await msg.reply_text(locale(f"sponsor1_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale))))
logWrite(
f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format"
)
await msg.reply_text(
locale(f"sponsor1_invalid", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"sponsor{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
return
progress["sponsorship"]["streamer"] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}},
{
"$set": {
"sponsorship": progress["sponsorship"],
"stage": progress["stage"] + 1,
}
},
)
await msg.reply_text(
locale(f"sponsor{stage+1}", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"sponsor{stage+1}",
"force_reply",
locale=self.locale,
)
)
),
)
elif stage == 2:
try:
input_dt = datetime.strptime(query, "%d.%m.%Y")
except ValueError:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
await msg.reply_text(locale(f"sponsor2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale))))
logWrite(
f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format"
)
await msg.reply_text(
locale(f"sponsor2_invalid", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"sponsor{stage}",
"force_reply",
locale=self.locale,
)
)
),
)
return
if datetime.now() >= input_dt:
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending date in the past")
await msg.reply_text(locale("sponsor2_past", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("sponsor2", "force_reply", locale=self.locale))))
logWrite(
f"User {msg.from_user.id} failed stage {stage} due to sending date in the past"
)
await msg.reply_text(
locale("sponsor2_past", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
"sponsor2", "force_reply", locale=self.locale
)
)
),
)
return
else:
progress["sponsorship"]["expires"] = input_dt
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}},
{
"$set": {
"sponsorship": progress["sponsorship"],
"stage": progress["stage"] + 1,
}
},
)
await msg.reply_text(
locale(f"sponsor{stage+1}", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"sponsor{stage+1}",
"force_reply",
locale=self.locale,
)
)
),
)
elif stage == 3:
if photo is not None:
progress["sponsorship"]["proof"] = photo.file_id
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}},
{
"$set": {
"sponsorship": progress["sponsorship"],
"stage": progress["stage"] + 1,
}
},
)
await msg.reply_text(
locale(f"sponsor{stage+1}", "message", locale=self.locale),
reply_markup=ForceReply(
placeholder=str(
locale(
f"sponsor{stage+1}",
"force_reply",
locale=self.locale,
)
)
),
)
elif stage == 4:
if len(query) > 16:
await msg.reply_text(locale("label_too_long", "message"), reply_markup=ForceReply(placeholder=str(locale("sponsor4", "force_reply", locale=self.locale))))
await msg.reply_text(
locale("label_too_long", "message"),
reply_markup=ForceReply(
placeholder=str(
locale(
"sponsor4", "force_reply", locale=self.locale
)
)
),
)
return
progress["sponsorship"]["label"] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "complete": True}})
col_tmp.update_one(
{"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}},
{
"$set": {
"sponsorship": progress["sponsorship"],
"complete": True,
}
},
)
await msg.reply_cached_media(
progress["sponsorship"]["proof"],
caption=locale("sponsor_confirm", "message", locale=self.locale).format(
caption=locale(
"sponsor_confirm", "message", locale=self.locale
).format(
progress["sponsorship"]["streamer"],
progress["sponsorship"]["expires"].strftime("%d.%m.%Y"),
progress["sponsorship"]["label"]
progress["sponsorship"]["label"],
),
reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
reply_markup=ReplyKeyboardMarkup(
locale("confirm", "keyboard", locale=self.locale),
resize_keyboard=True,
),
)
else:
return
@@ -516,5 +945,9 @@ class HoloUser():
### Returns:
* `bool`: `True` if any not finished spoilers available and `False` if none.
"""
return False if col_spoilers.find_one({"user": self.id, "completed": False}) is None else True
"""
return (
False
if col_spoilers.find_one({"user": self.id, "completed": False}) is None
else True
)

View File

@@ -2,6 +2,7 @@
from datetime import datetime
class DefaultApplicationTemp(dict):
def __init__(self, user: int, reapply: bool = False):
super().__init__({})
@@ -23,10 +24,11 @@ class DefaultApplicationTemp(dict):
"7": None,
"8": None,
"9": None,
"10": None
}
"10": None,
},
}
class DefaultSponsorshipTemp(dict):
def __init__(self, user: int):
super().__init__({})
@@ -41,6 +43,6 @@ class DefaultSponsorshipTemp(dict):
"streamer": None,
"expires": datetime.fromtimestamp(0),
"proof": None,
"label": ""
}
}
"label": "",
},
}