Added sponsorship support
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
from datetime import datetime
|
||||
from os import sep
|
||||
from uuid import uuid1
|
||||
from requests import get
|
||||
from traceback import print_exc
|
||||
from app import app, isAnAdmin
|
||||
@@ -35,6 +37,24 @@ class DefaultApplicationTemp(dict):
|
||||
}
|
||||
}
|
||||
|
||||
class DefaultSponsorshipTemp(dict):
|
||||
def __init__(self, user: int):
|
||||
super().__init__({})
|
||||
self.dict = {
|
||||
"user": user,
|
||||
"type": "sponsorship",
|
||||
"complete": False,
|
||||
"sent": False,
|
||||
"state": "fill",
|
||||
"stage": 1,
|
||||
"sponsorship": {
|
||||
"streamer": None,
|
||||
"expires": datetime.fromtimestamp(0),
|
||||
"proof": None,
|
||||
"label": ""
|
||||
}
|
||||
}
|
||||
|
||||
class UserNotFoundError(Exception):
|
||||
"""HoloUser could not find user with such an ID in database"""
|
||||
def __init__(self, user, user_id):
|
||||
@@ -103,18 +123,18 @@ class HoloUser():
|
||||
if isinstance(user, User):
|
||||
|
||||
if (self.name != user.first_name) and hasattr(user, "first_name") and (user.first_name is not None):
|
||||
self.set("tg_name", user.first_name)
|
||||
self.set("name", user.first_name, db_key="tg_name")
|
||||
|
||||
if (self.phone != user.phone_number) and hasattr(user, "phone_number") and (user.phone_number is not None):
|
||||
self.set("tg_phone", user.phone_number)
|
||||
if (self.phone != user.phone_number) and hasattr(user, "phone") and (user.phone_number is not None):
|
||||
self.set("phone", user.phone_number, db_key="tg_phone")
|
||||
|
||||
if (self.locale != user.language_code) and hasattr(user, "language_code") and (user.language_code is not None):
|
||||
self.set("tg_locale", user.language_code)
|
||||
if (self.locale != user.language_code) and hasattr(user, "locale") and (user.language_code is not None):
|
||||
self.set("locale", user.language_code, db_key="tg_locale")
|
||||
|
||||
if (self.username != user.username) and hasattr(user, "username") and (user.username is not None):
|
||||
self.set("tg_username", user.username)
|
||||
self.set("username", user.username, db_key="tg_username")
|
||||
|
||||
def set(self, key: str, value: Any) -> None:
|
||||
def set(self, key: str, value: Any, db_key: Union[str, None] = None) -> None:
|
||||
"""Set attribute data and save it into database
|
||||
|
||||
### Args:
|
||||
@@ -124,7 +144,8 @@ class HoloUser():
|
||||
if not hasattr(self, key):
|
||||
raise AttributeError()
|
||||
setattr(self, key, value)
|
||||
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True)
|
||||
db_key = key if db_key is None else db_key
|
||||
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { db_key: value } }, upsert=True)
|
||||
logWrite(f"Set attribute {key} of user {self.id} to {value}")
|
||||
|
||||
async def message(self,
|
||||
@@ -394,4 +415,97 @@ class HoloUser():
|
||||
logWrite(f"User {self.id} completed stage {stage} of application")
|
||||
|
||||
else:
|
||||
return
|
||||
return
|
||||
|
||||
def sponsorship_state(self) -> tuple[Literal["none", "fill", "approved", "rejected"], bool]:
|
||||
"""Check the current state of sponsorship in tmp collection
|
||||
|
||||
### Returns:
|
||||
* `tuple[Literal["none", "fill", "approved", "rejected"], bool]`: First element is an enum of a state and the second one is whether sponsorship application is complete.
|
||||
"""
|
||||
tmp_sponsorship = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
|
||||
if tmp_sponsorship is None:
|
||||
return "none", False
|
||||
else:
|
||||
return tmp_sponsorship["state"], tmp_sponsorship["complete"]
|
||||
|
||||
def sponsorship_valid(self) -> bool:
|
||||
"""Check whether user has a valid sponsorship
|
||||
|
||||
### Returns:
|
||||
* `bool`: `True` if yes and `False` if no
|
||||
"""
|
||||
return True if col_sponsorships.find_one({"user": self.id, "expires": {"$gt": datetime.now()}}) is not None else False
|
||||
|
||||
def sponsorship_restart(self) -> None:
|
||||
"""Reset sponsorship of a user in tmp collection and replace it with an empty one
|
||||
"""
|
||||
if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is None:
|
||||
col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict)
|
||||
else:
|
||||
col_tmp.delete_one({"user": self.id, "type": "sponsorship"})
|
||||
col_tmp.insert_one(document=DefaultSponsorshipTemp(self.id).dict)
|
||||
|
||||
async def sponsorship_next(self, query: str, msg: Message, photo: Union[Photo, None] = None) -> None:
|
||||
"""Move on filling sponsorship of user
|
||||
|
||||
### Args:
|
||||
* query (`str`): Some kind of input
|
||||
* msg (`Message`): Message that should receive replies
|
||||
"""
|
||||
|
||||
if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is not None:
|
||||
|
||||
progress = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
|
||||
stage = progress["stage"]
|
||||
|
||||
if progress["state"] == "fill":
|
||||
|
||||
if stage == 1:
|
||||
|
||||
progress["sponsorship"]["streamer"] = query
|
||||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
|
||||
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
|
||||
|
||||
elif stage == 2:
|
||||
|
||||
try:
|
||||
input_dt = datetime.strptime(query, "%d.%m.%Y")
|
||||
except ValueError:
|
||||
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending invalid date format")
|
||||
await msg.reply_text(locale(f"sponsor2_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage}", "force_reply", locale=self.locale))))
|
||||
return
|
||||
|
||||
if datetime.now() >= input_dt:
|
||||
logWrite(f"User {msg.from_user.id} failed stage {stage} due to sending date in the past")
|
||||
await msg.reply_text(locale("sponsor2_past", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale("sponsor2", "force_reply", locale=self.locale))))
|
||||
return
|
||||
|
||||
else:
|
||||
progress["sponsorship"]["expires"] = input_dt
|
||||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
|
||||
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
|
||||
|
||||
elif stage == 3:
|
||||
|
||||
if photo is not None:
|
||||
filename = uuid1()
|
||||
await app.download_media(photo.file_id, f"tmp{sep}{filename}")
|
||||
with open(f"tmp{sep}{filename}", "rb") as f:
|
||||
photo_bytes = f.read()
|
||||
progress["sponsorship"]["proof"] = photo_bytes
|
||||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
|
||||
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
|
||||
|
||||
elif stage == 4:
|
||||
progress["sponsorship"]["label"] = query
|
||||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "complete": True}})
|
||||
await msg.reply_text("Sponsorship application is filled. Want to send it?", reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
|
||||
|
||||
else:
|
||||
return
|
||||
|
||||
logWrite(f"User {self.id} completed stage {stage} of sponsorship")
|
||||
|
||||
else:
|
||||
return
|
Reference in New Issue
Block a user