TelegramPoster/plugins/callbacks/submission.py

267 lines
7.7 KiB
Python

import logging
from os import path
from pathlib import Path
from shutil import rmtree
from bson import ObjectId
from libbot import config_get
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
from classes.exceptions import (
SubmissionDuplicatesError,
SubmissionUnavailableError,
SubmissionUnsupportedError,
)
from classes.pyroclient import PyroClient
from modules.database import col_submitted
logger = logging.getLogger(__name__)
@Client.on_callback_query(filters.regex("sub_yes_[\s\S]*"))
async def callback_query_yes(app: PyroClient, clb: CallbackQuery):
user = await app.find_user(clb.from_user)
fullclb = str(clb.data).split("_")
db_entry = col_submitted.find_one({"_id": ObjectId(fullclb[2])})
try:
submission = await app.submit_media(fullclb[2])
except SubmissionUnavailableError:
await clb.answer(
text=app._("sub_msg_unavail", "callback", locale=user.locale),
show_alert=True,
)
return
except SubmissionUnsupportedError:
await clb.answer(
text=app._("mime_not_allowed", "message", locale=user.locale).format(
", ".join(app.config["submission"]["mime_types"]), quote=True
),
show_alert=True,
)
return
except SubmissionDuplicatesError as exp:
await clb.answer(
text=app._("sub_duplicates_found", "callback", locale=user.locale),
show_alert=True,
)
await clb.message.reply_text(
app._("sub_media_duplicates_list", "message", locale=user.locale).format(
"\n".join(exp.duplicates)
),
quote=True,
)
logger.info(
"Submission with ID '%s' could not be accepted because of the duplicates: %s",
fullclb[2],
str(exp.duplicates),
)
return
if submission[0] is not None:
await submission[0].reply_text(
app._(
"sub_yes",
"message",
locale=(await app.find_user(submission[0].from_user)).locale,
),
quote=True,
)
elif db_entry is not None:
await app.send_message(
db_entry["user"],
app._(
"sub_yes",
"message",
locale=(await app.find_user(db_entry["user"])).locale,
),
)
await clb.answer(
text=app._("sub_yes", "callback", locale=user.locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = (
[
[
InlineKeyboardButton(
text=str(app._("accepted", "button", locale=user.locale)),
callback_data="nothing",
)
],
clb.message.reply_markup.inline_keyboard[1],
]
if len(clb.message.reply_markup.inline_keyboard) > 1
else [
[
InlineKeyboardButton(
text=str(app._("accepted", "button", locale=user.locale)),
callback_data="nothing",
)
]
]
)
if await config_get("send_uploaded_id", "submission"):
await clb.message.edit_caption(
f"{clb.message.caption}\n\nID: `{submission[1]}`"
)
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
"Submission with ID '%s' accepted and uploaded with ID '%s'",
fullclb[2],
submission[1],
)
@Client.on_callback_query(filters.regex("sub_no_[\s\S]*"))
async def callback_query_no(app: PyroClient, clb: CallbackQuery):
user = await app.find_user(clb.from_user)
fullclb = str(clb.data).split("_")
db_entry = col_submitted.find_one_and_delete({"_id": ObjectId(fullclb[2])})
if (
db_entry["temp"]["uuid"] is not None
and Path(
f"{app.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}"
).exists()
):
rmtree(
Path(
f"{app.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}"
),
ignore_errors=True,
)
try:
submission = await app.get_messages(
db_entry["user"], db_entry["telegram"]["msg_id"]
)
except Exception as exp:
await clb.answer(
text=app._("sub_msg_unavail", "message", locale=user.locale),
show_alert=True,
)
return
await submission.reply_text(
app._(
"sub_no",
"message",
locale=(await app.find_user(submission.from_user)).locale,
),
quote=True,
)
await clb.answer(
text=app._("sub_no", "callback", locale=user.locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = (
[
[
InlineKeyboardButton(
text=str(app._("declined", "button", locale=user.locale)),
callback_data="nothing",
)
],
clb.message.reply_markup.inline_keyboard[1],
]
if len(clb.message.reply_markup.inline_keyboard) > 1
else [
[
InlineKeyboardButton(
text=str(app._("declined", "button", locale=user.locale)),
callback_data="nothing",
)
]
]
)
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
"Submission with ID '%s' rejected",
fullclb[2],
)
@Client.on_callback_query(filters.regex("sub_block_[\s\S]*"))
async def callback_query_block(app: PyroClient, clb: CallbackQuery):
user = await app.find_user(clb.from_user)
fullclb = str(clb.data).split("_")
await app.send_message(
int(fullclb[2]),
app._(
"sub_blocked",
"message",
locale=(await app.find_user(int(fullclb[2]))).locale,
),
)
await user.block()
await clb.answer(
text=app._("sub_block", "callback", locale=user.locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = [
clb.message.reply_markup.inline_keyboard[0],
[
InlineKeyboardButton(
text=str(app._("sub_unblock", "button", locale=user.locale)),
callback_data=f"sub_unblock_{fullclb[2]}",
)
],
]
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info("User %s has been blocked", fullclb[2])
@Client.on_callback_query(filters.regex("sub_unblock_[\s\S]*"))
async def callback_query_unblock(app: PyroClient, clb: CallbackQuery):
user = await app.find_user(clb.from_user)
fullclb = str(clb.data).split("_")
await app.send_message(
int(fullclb[2]),
app._(
"sub_unblocked",
"message",
locale=(await app.find_user(int(fullclb[2]))).locale,
),
)
await user.unblock()
await clb.answer(
text=app._("sub_unblock", "callback", locale=user.locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = [
clb.message.reply_markup.inline_keyboard[0],
[
InlineKeyboardButton(
text=str(app._("sub_block", "button", locale=user.locale)),
callback_data=f"sub_block_{fullclb[2]}",
)
],
]
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info("User %s has been unblocked", fullclb[2])