Compare commits

...

10 Commits

12 changed files with 173 additions and 29 deletions

7
app.py
View File

@ -8,13 +8,19 @@ from pyrogram.errors import bad_request_400
app = Client("holochecker", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot"))
async def isAnAdmin(admin_id):
# Check if user is mentioned in config
if (admin_id == configGet("owner")) or (admin_id in configGet("admins")):
return True
# Check if user is probably in cache
if path.exists(f"cache{sep}admins") is True:
try:
return True if admin_id in jsonLoad(f"cache{sep}admins") else False
except (FileNotFoundError, JSONDecodeError):
pass
# Check if user is in admin group
try:
async for member in app.get_chat_members(configGet("admin_group")):
if member.user.id == admin_id:
@ -22,4 +28,5 @@ async def isAnAdmin(admin_id):
except bad_request_400.ChannelInvalid:
logWrite(f"Could not get users in admin group to answer isAnAdmin(). Bot is likely not in the group.")
return False
return False

View File

@ -11,7 +11,7 @@ from dateutil.relativedelta import relativedelta
from classes.errors.geo import PlaceNotFoundError
from modules.database import col_tmp, col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages
from modules.logging import logWrite
from modules.utils import configGet, find_location, locale, should_quote
from modules.utils import configGet, create_tmp, download_tmp, find_location, locale, should_quote
class DefaultApplicationTemp(dict):
def __init__(self, user: int, reapply: bool = False):
@ -349,14 +349,14 @@ class HoloUser():
* msg (`Message`): Message that should receive replies
"""
if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
# if col_tmp.find_one({"user": self.id, "type": "application"}) is None:
if self.sponsorship_state()[0] == "fill":
return
if self.sponsorship_state()[0] == "fill":
return
col_tmp.insert_one(
document=DefaultApplicationTemp(self.id).dict
)
# col_tmp.insert_one(
# document=DefaultApplicationTemp(self.id).dict
# )
progress = col_tmp.find_one({"user": self.id, "type": "application"})
@ -365,9 +365,9 @@ class HoloUser():
stage = progress["stage"]
if self.sponsorship_state()[0] == "fill":
await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg))
return
# if self.sponsorship_state()[0] == "fill":
# await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg))
# return
if progress["state"] == "fill" and progress["sent"] is False:
@ -522,11 +522,7 @@ class HoloUser():
elif stage == 3:
if photo is not None:
filename = uuid1()
await app.download_media(photo.file_id, f"tmp{sep}{filename}")
with open(f"tmp{sep}{filename}", "rb") as f:
photo_bytes = f.read()
progress["sponsorship"]["proof"] = photo_bytes
progress["sponsorship"]["proof"] = await download_tmp(app, photo.file_id)
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "stage": progress["stage"]+1}})
await msg.reply_text(locale(f"sponsor{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"sponsor{stage+1}", "force_reply", locale=self.locale))))
@ -536,7 +532,14 @@ class HoloUser():
return
progress["sponsorship"]["label"] = query
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "sponsorship"}}, {"$set": {"sponsorship": progress["sponsorship"], "complete": True}})
await msg.reply_text(locale("sponsor_confirm", "message", locale=self.locale), reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
await msg.reply_photo(
photo=create_tmp(progress["sponsorship"]["proof"], kind="image"),
caption=locale("sponsor_confirm", "message", locale=self.locale).format(
progress["sponsorship"]["streamer"],
progress["sponsorship"]["expires"].strftime("%d.%m.%Y"),
progress["sponsorship"]["label"]
),
reply_markup=ReplyKeyboardMarkup(locale("confirm", "keyboard", locale=self.locale), resize_keyboard=True))
else:
return

View File

@ -13,6 +13,7 @@ makedirs(f'{configGet("cache", "locations")}{sep}avatars', exist_ok=True)
from modules.commands.application import *
from modules.commands.applications import *
from modules.commands.cancel import *
from modules.commands.identify import *
from modules.commands.label import *
from modules.commands.message import *
from modules.commands.nearby import *

View File

@ -30,7 +30,7 @@
"sponsor4": "Яку роль ти бажаєш отримати?\n\n **Підказка**\nНазва ролі повинна бути якось пов'язана зі вказаною дівчиною, не повинна порушувати правила спільноти а також має бути не довше за 16 символів (обмеження Telegram).",
"sponsorship_application_empty": "❌ **Дія неможлива**\nУ тебе немає заповненої та схваленої анкети. Заповни таку за допомогою /reapply та спробуй ще раз після її підтвердження.",
"confirm": "Супер, дякуємо!\n\nБудь ласка, перевір правильність даних:\n{0}\n\nВсе правильно?",
"sponsor_confirm": "Здається, це все. Перевір чи все правильно та жмакни кнопку на клавіатурі щоб продовжити.",
"sponsor_confirm": "**Дані форми:**\nСтрімер: {0}\nПідписка до: {1}\nХочу роль: {2}\n\nПеревір чи все правильно та жмакни кнопку на клавіатурі щоб продовжити.",
"application_sent": "Дякуємо! Ми надіслали твою анкетку на перевірку. Ти отримаєш повідомлення як тільки її перевірять та приймуть рішення. До тих пір від тебе більше нічого не потребується :)",
"sponsorship_sent": "Дякуємо! Ми надіслали форму на перевірку. Ти отримаєш повідомлення як тільки її перевірять та приймуть рішення :)",
"application_got": "Отримано анкету від `{0}`\n\nІм'я тг: `{1}`\nЮзернейм: @{2}\n\n**Дані анкети:**\n{3}",
@ -96,6 +96,12 @@
"nearby_error": "⚠️ **Сталась помилка**\n\nПомилка: `{0}`\n\nTraceback:\n```\n{1}\n```",
"nearby_result": "Результати пошуку:\n\n{0}",
"nearby_empty": "Здається, нікого поблизу немає.",
"cancel": "Всі поточні операції скасовано.",
"identify_invalid_syntax": "Неправильний синтаксис!\nТреба: `/identify ID/NAME/USERNAME`",
"identify_not_found": "Не знайдено користувачів за запитом **{0}**",
"identify_success": "Користувач `{0}`\n\nІм'я: {1}\nЮзернейм: {2}\nЄ в чаті: {3}\nЄ адміном: {4}\nРоль: {5}\nНаявна анкета: {6}\nНаявне спонсорство: {7}",
"yes": "Так",
"no": "Ні",
"voice_message": [
"why are u gae",
"руки відірвало? пиши як людина",
@ -242,6 +248,7 @@
"reboot": "Перезапустити бота",
"label": "Встановити нікнейм користувачу",
"message": "Надіслати користувачу повідомлення",
"identify": "Дізнатись дані про користувача за айді",
"warnings": "Переглянути попередження користувача",
"application": "Переглянути анкету користувача",
"applications": "Отримати всі анкети як JSON"
@ -251,6 +258,7 @@
"label": "Встановити нікнейм користувачу",
"nearby": "Показати користувачів поблизу",
"message": "Надіслати користувачу повідомлення",
"identify": "Дізнатись дані про користувача за айді",
"warnings": "Переглянути попередження користувача",
"application": "Переглянути анкету користувача",
"applications": "Отримати всі анкети як JSON"

View File

@ -22,7 +22,7 @@ async def cmd_application(app: Client, msg: Message):
except (ValueError, UserNotFoundError):
try:
holo_user = HoloUser((await app.get_users(msg.command[1])).id)
except (bad_request_400.UsernameInvalid, bad_request_400.PeerIdInvalid):
except (bad_request_400.UsernameInvalid, bad_request_400.PeerIdInvalid, bad_request_400.UsernameNotOccupied):
await msg.reply_text(locale("no_user_application", "message", locale=msg.from_user).format(msg.command[1]), quote=should_quote(msg))
return

View File

@ -2,8 +2,11 @@ from app import app
from pyrogram import filters
from pyrogram.types import Message
from pyrogram.client import Client
from modules.utils import should_quote
from modules.utils import should_quote, logWrite, locale
from modules.database import col_tmp
@app.on_message(~ filters.scheduled & filters.command("cancel", prefixes=["/"]))
async def command_cancel(app: Client, msg: Message):
await msg.reply_text("Command exists.", quote=should_quote(msg))
col_tmp.delete_many( {"user": msg.from_user.id} )
await msg.reply_text(locale("cancel", "message", locale=msg.from_user), quote=should_quote(msg))
logWrite(f"Cancelling all ongoing tmp operations for {msg.from_user.id}")

View File

@ -0,0 +1,63 @@
from os import path
from app import app, isAnAdmin
from pyrogram import filters
from pyrogram.types import Message
from pyrogram.client import Client
from pyrogram.errors import bad_request_400
from pyrogram.enums.chat_action import ChatAction
from classes.holo_user import HoloUser, UserNotFoundError, UserInvalidError
from modules.utils import jsonLoad, should_quote, logWrite, locale, download_tmp, create_tmp
from modules import custom_filters
@app.on_message(~ filters.scheduled & filters.command("identify", prefixes=["/"]) & custom_filters.admin)
async def command_identify(app: Client, msg: Message):
if len(msg.command) != 2:
await msg.reply_text(locale("identify_invalid_syntax", "message", locale=msg.from_user))
return
try:
try:
holo_user = HoloUser(int(msg.command[1]))
except ValueError:
holo_user = HoloUser(await app.get_users(msg.command[1]))
except (UserInvalidError, UserNotFoundError, bad_request_400.UsernameInvalid, bad_request_400.PeerIdInvalid, bad_request_400.UsernameNotOccupied):
await msg.reply_text(locale("identify_not_found", "message", locale=msg.from_user).format(msg.command[1]))
return
role = holo_user.label
has_application = locale("yes", "message", locale=msg.from_user) if holo_user.application_approved() is True else locale("no", "message", locale=msg.from_user)
has_sponsorship = locale("yes", "message", locale=msg.from_user) if holo_user.sponsorship_valid() is True else locale("no", "message", locale=msg.from_user)
username = holo_user.username if holo_user.username is not None else "N/A"
in_chat = locale("yes", "message", locale=msg.from_user) if (holo_user.id in jsonLoad(path.join("cache", "group_members"))) else locale("no", "message", locale=msg.from_user)
is_admin = locale("yes", "message", locale=msg.from_user) if (await isAnAdmin(holo_user.id)) else locale("no", "message", locale=msg.from_user)
output = locale("identify_success", "message", locale=msg.from_user).format(
holo_user.id,
holo_user.name,
username,
in_chat,
is_admin,
role,
has_application,
has_sponsorship
)
user = await app.get_users(holo_user.id)
if user.photo is not None:
await app.send_chat_action(msg.chat.id, action=ChatAction.UPLOAD_PHOTO)
await msg.reply_photo(
create_tmp(await download_tmp(app, user.photo.big_file_id), kind="image"),
quote=should_quote(msg),
caption=output
)
else:
await app.send_chat_action(msg.chat.id, action=ChatAction.TYPING)
await msg.reply_text(
output,
quote=should_quote(msg)
)
logWrite(f"User {msg.from_user.id} identified user {holo_user.id}")

View File

@ -6,7 +6,7 @@ from modules.utils import locale, should_quote, find_user
from classes.holo_user import HoloUser, LabelTooLongError
from modules import custom_filters
@app.on_message(~ filters.scheduled & filters.private & filters.command(["label"], prefixes=["/"]) & custom_filters.admin)
@app.on_message(~ filters.scheduled & filters.command(["label"], prefixes=["/"]) & custom_filters.admin)
async def cmd_label(app: Client, msg: Message):
if len(msg.command) < 3:

View File

@ -13,18 +13,25 @@ async def cmd_reapply(app: Client, msg: Message):
holo_user = HoloUser(msg.from_user)
if holo_user.application_state()[0] in ["approved", "rejected"]:
if (holo_user.application_state()[1] is True) and (not col_tmp.find_one({"user": holo_user.id, "type": "application"})["sent"]):
# Check if user has approved/rejected tmp application
if (holo_user.application_state()[0] in ["approved", "rejected"]) or (holo_user.application_state()[0] == "none"):
# Check if user's tmp application is already completed or even sent
if ((holo_user.application_state()[1] is True) and (not col_tmp.find_one({"user": holo_user.id, "type": "application"})["sent"])) or (holo_user.application_state()[0] == "none"):
left_chat = True
async for member in app.get_chat_members(configGet("destination_group")):
if member.user.id == msg.from_user.id:
left_chat = False
if not left_chat:
if holo_user.sponsorship_state()[0] == "fill":
await msg.reply_text(locale("finish_sponsorship", "message"), quote=should_quote(msg))
return
holo_user.application_restart(reapply=True)
await welcome_pass(app, msg, once_again=True)
else:
await msg.reply_text(locale("reapply_left_chat", "message", locale=holo_user), reply_markup=InlineKeyboardMarkup([
[
@ -34,15 +41,20 @@ async def cmd_reapply(app: Client, msg: Message):
InlineKeyboardButton(locale("reapply_new_one", "button", locale=holo_user), f"reapply_new_{msg.id}")
]
]))
else:
await msg.reply_text(locale("reapply_in_progress", "message", locale=holo_user).format(locale("confirm", "keyboard", locale=holo_user)[1][0]), reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(locale("applying_stop", "button", locale=holo_user), f"reapply_stop_{msg.id}")
]
]))
else:
if (holo_user.application_state()[0] == "fill") and (col_tmp.find_one({"user": holo_user.id, "type": "application"})["sent"] is True):
await msg.reply_text(locale("reapply_forbidden", "message", locale=holo_user))
else:
await msg.reply_text(locale("reapply_in_progress", "message", locale=holo_user).format(locale("confirm", "keyboard", locale=holo_user)[1][0]), reply_markup=InlineKeyboardMarkup([
[

View File

@ -1,11 +1,15 @@
"""Custom message filters made to improve commands
usage in context of Holo Users."""
from app import isAnAdmin
from modules.database import col_applications
from pyrogram import filters
from pyrogram.types import Message
async def admin_func(_, __, msg):
return await isAnAdmin(msg)
async def admin_func(_, __, msg: Message):
return await isAnAdmin(msg.from_user.id)
async def allowed_func(_, __, msg):
async def allowed_func(_, __, msg: Message):
return True if (col_applications.find_one({"user": msg.from_user.id}) is not None) else False
admin = filters.create(admin_func)

View File

@ -2,6 +2,7 @@ from app import app
from pyrogram import filters
from pyrogram.types import ForceReply, ReplyKeyboardMarkup, Message
from pyrogram.client import Client
from classes.holo_user import HoloUser
from modules.utils import all_locales, locale, logWrite
# Welcome check ================================================================================================================
@ -11,7 +12,7 @@ for pattern in all_locales("welcome", "keyboard"):
for pattern in all_locales("return", "keyboard"):
welcome_1.append(pattern[0][0])
@app.on_message(~ filters.scheduled & filters.private & filters.command(welcome_1, prefixes=[""]))
async def welcome_pass(app, msg, once_again: bool = True) -> None:
async def welcome_pass(app: Client, msg: Message, once_again: bool = True) -> None:
"""Set user's stage to 1 and start a fresh application
### Args:
@ -22,6 +23,10 @@ async def welcome_pass(app, msg, once_again: bool = True) -> None:
if not once_again:
await msg.reply_text(locale("privacy_notice", "message"))
holo_user = HoloUser(msg.from_user)
holo_user.application_restart()
logWrite(f"User {msg.from_user.id} confirmed starting the application")
await msg.reply_text(locale("question1", "message", locale=msg.from_user), reply_markup=ForceReply(placeholder=locale("question1", "force_reply", locale=msg.from_user)))

View File

@ -1,4 +1,5 @@
from typing import Any, Union
from typing import Any, Literal, Union
from uuid import uuid1
from requests import get
from pyrogram.enums.chat_type import ChatType
from pyrogram.types import User
@ -8,7 +9,7 @@ from ujson import JSONDecodeError as JSONDecodeError
from ujson import loads, dumps
from sys import exit
from os import kill, listdir, sep
from os import kill, listdir, makedirs, path, sep
from os import name as osname
from traceback import print_exc
from classes.errors.geo import PlaceNotFoundError
@ -191,6 +192,43 @@ def find_location(query: str) -> dict:
except (ValueError, KeyError, IndexError):
raise PlaceNotFoundError(query)
def create_tmp(bytedata: Union[bytes, bytearray], kind: Union[Literal["image", "video"], None] = None) -> str:
"""Create temporary file to help uploading it
### Args:
* bytedata (`Union[bytes, bytearray]`): Some bytes to be written
* kind (`Union[Literal["image", "video"], None]`): Kind of upload. Will add `.jpg` or `.mp4` if needed
### Returns:
* `str`: Path to temporary file
"""
filename = str(uuid1())
if kind == "image":
filename += ".jpg"
elif kind == "video":
filename += ".mp4"
makedirs("tmp", exist_ok=True)
with open(path.join("tmp", filename), "wb") as file:
file.write(bytedata)
return path.join("tmp", filename)
async def download_tmp(app: Client, file_id: str) -> bytes:
"""Download file by its ID and return its bytes
### Args:
* app (`Client`): App that will download the file
* file_id (`str`): File's unique id
### Returns:
* `bytes`: Bytes of downloaded file
"""
filename = str(uuid1())
makedirs("tmp", exist_ok=True)
await app.download_media(file_id, path.join("tmp", filename))
with open(path.join("tmp", filename), "rb") as f:
bytedata = f.read()
return bytedata
try:
from psutil import Process
except ModuleNotFoundError: