Telegram/modules/commands/warnings.py

147 lines
5.2 KiB
Python

from os import path
from app import app
from pyrogram import filters
from pyrogram.types import Message
from pyrogram.client import Client
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from modules.utils import configGet, jsonLoad, locale, should_quote
from modules.database import col_users, col_warnings
from modules import custom_filters
from pykeyboard import InlineKeyboard, InlineButton
@app.on_message(
custom_filters.enabled_warnings
& ~filters.scheduled
& filters.command(["warnings"], prefixes=["/"])
& custom_filters.admin
)
async def cmd_warnings(app: Client, msg: Message):
if len(msg.command) == 1:
warnings = {}
warnings_output = []
group_members = jsonLoad(
path.join(configGet("cache", "locations"), "group_members")
)
for warning in col_warnings.find({"active": True}):
if warning["user"] not in group_members:
continue
if str(warning["user"]) not in warnings:
warnings[str(warning["user"])] = {
"name": (col_users.find_one({"user": warning["user"]}))["tg_name"],
"warns": 1,
}
else:
warnings[str(warning["user"])]["warns"] += 1
for warning in warnings:
warnings_output.append(
locale("warnings_entry", "message", locale=msg.from_user).format(
warnings[warning]["name"], warning, warnings[warning]["warns"]
),
)
warnings_output = (
locale("warnings_empty", "message", locale=msg.from_user)
if len(warnings_output) == 0
else "\n".join(warnings_output)
)
await msg.reply_text(
locale("warnings_all", "message", locale=msg.from_user).format(
warnings_output
),
quote=should_quote(msg),
)
return
if len(msg.command) > 3:
await msg.reply_text(
locale("syntax_warnings", "message", locale=msg.from_user),
quote=should_quote(msg),
)
return
try:
user_db = col_users.find_one({"user": int(msg.command[1])})
target_id = user_db["user"]
target_name = user_db["tg_name"]
except:
list_of_users = []
async for m in app.get_chat_members(
configGet("users", "groups"),
filter=ChatMembersFilter.SEARCH,
query=msg.command[1],
):
list_of_users.append(m)
if len(list_of_users) != 0:
target = list_of_users[0].user
target_name = target.first_name
target_id = target.id
else:
await msg.reply_text(
locale("no_user_warnings", "message", locale=msg.from_user).format(
msg.command[1]
)
)
return
if len(msg.command) == 3 and msg.command[2].lower() == "revoke":
if col_warnings.count_documents({"user": target_id, "active": True}) == 0:
await msg.reply_text(
locale("no_warnings", "message", locale=msg.from_user).format(
target_name, target_id
),
quote=should_quote(msg),
)
return
keyboard = InlineKeyboard()
buttons = []
warnings = []
for index, warning in enumerate(
list(col_warnings.find({"user": target_id, "active": True}))
):
warnings.append(
f'{index+1}. {warning["date"].strftime("%d.%m.%Y, %H:%M")}\n Адмін: {warning["admin"]}\n Причина: {warning["reason"]}'
)
buttons.append(InlineButton(str(index + 1), f'w_rev_{str(warning["_id"])}'))
keyboard.add(*buttons)
await msg.reply_text(
locale("warnings_revoke", "message", locale=msg.from_user).format(
target_name, "\n".join(warnings)
),
reply_markup=keyboard,
quote=should_quote(msg),
)
return
warns = col_warnings.count_documents({"user": target_id, "active": True})
if warns == 0:
await msg.reply_text(
locale("no_warnings", "message", locale=msg.from_user).format(
target_name, target_id
),
quote=should_quote(msg),
)
else:
warnings = []
for index, warning in enumerate(
list(col_warnings.find({"user": target_id, "active": True}))
):
warnings.append(
f'{index+1}. {warning["date"].strftime("%d.%m.%Y, %H:%M")}\n Адмін: {warning["admin"]}\n Причина: {warning["reason"]}'
)
if warns <= 5:
await msg.reply_text(
locale("warnings_1", "message", locale=msg.from_user).format(
target_name, target_id, warns, "\n".join(warnings), target_id
),
quote=should_quote(msg),
)
else:
await msg.reply_text(
locale("warnings_2", "message", locale=msg.from_user).format(
target_name, target_id, warns, "\n".join(warnings), target_id
),
quote=should_quote(msg),
)