TelegramPoster/plugins/callbacks/submission.py

264 lines
7.7 KiB
Python

import logging
from os import path
from pathlib import Path
from shutil import rmtree
from bson import ObjectId
from libbot import config_get
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
from classes.exceptions import (
SubmissionDuplicatesError,
SubmissionUnavailableError,
SubmissionUnsupportedError,
)
from classes.pyroclient import PyroClient
from classes.user import PosterUser
from modules.database import col_submitted
logger = logging.getLogger(__name__)
@Client.on_callback_query(filters.regex("sub_yes_[\s\S]*"))
async def callback_query_yes(app: PyroClient, clb: CallbackQuery):
fullclb = str(clb.data).split("_")
user_locale = clb.from_user.language_code
db_entry = col_submitted.find_one({"_id": ObjectId(fullclb[2])})
try:
submission = await app.submit_photo(fullclb[2])
except SubmissionUnavailableError:
await clb.answer(
text=app._("sub_msg_unavail", "callback", locale=user_locale),
show_alert=True,
)
return
except SubmissionUnsupportedError:
await clb.answer(
text=app._("mime_not_allowed", "message", locale=user_locale).format(
", ".join(app.config["submission"]["mime_types"]), quote=True
),
show_alert=True,
)
return
except SubmissionDuplicatesError as exp:
await clb.answer(
text=app._("sub_duplicates_found", "callback", locale=user_locale),
show_alert=True,
)
await clb.message.reply_text(
app._("sub_media_duplicates_list", "message", locale=user_locale).format(
"\n".join(exp.duplicates)
),
quote=True,
)
logger.info(
app._(
"submission_duplicate",
"console",
locale=app.config["locale_log"],
).format(
fullclb[2],
str(exp.duplicates),
),
)
return
if submission[0] is not None:
await submission[0].reply_text(
app._("sub_yes", "message", locale=submission[0].from_user.language_code),
quote=True,
)
elif db_entry is not None:
await app.send_message(db_entry["user"], app._("sub_yes", "message"))
await clb.answer(
text=app._("sub_yes", "callback", locale=user_locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = (
[
[
InlineKeyboardButton(
text=str(app._("accepted", "button", locale=user_locale)),
callback_data="nothing",
)
],
clb.message.reply_markup.inline_keyboard[1],
]
if len(clb.message.reply_markup.inline_keyboard) > 1
else [
[
InlineKeyboardButton(
text=str(app._("accepted", "button", locale=user_locale)),
callback_data="nothing",
)
]
]
)
if await config_get("send_uploaded_id", "submission"):
await clb.message.edit_caption(
clb.message.caption + f"\n\nID: `{submission[1]}`"
)
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
app._(
"submission_accepted",
"console",
locale=app.config["locale_log"],
).format(fullclb[2], submission[1]),
)
@Client.on_callback_query(filters.regex("sub_no_[\s\S]*"))
async def callback_query_no(app: PyroClient, clb: CallbackQuery):
fullclb = str(clb.data).split("_")
user_locale = clb.from_user.language_code
db_entry = col_submitted.find_one_and_delete({"_id": ObjectId(fullclb[2])})
if (
db_entry["temp"]["uuid"] is not None
and Path(
f"{app.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}"
).exists()
):
rmtree(
Path(
f"{app.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}"
),
ignore_errors=True,
)
try:
submission = await app.get_messages(
db_entry["user"], db_entry["telegram"]["msg_id"]
)
except Exception as exp:
await clb.answer(
text=app._("sub_msg_unavail", "message", locale=user_locale),
show_alert=True,
)
return
await submission.reply_text(
app._("sub_no", "message", locale=submission.from_user.language_code),
quote=True,
)
await clb.answer(
text=app._("sub_no", "callback", locale=user_locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = (
[
[
InlineKeyboardButton(
text=str(app._("declined", "button", locale=user_locale)),
callback_data="nothing",
)
],
clb.message.reply_markup.inline_keyboard[1],
]
if len(clb.message.reply_markup.inline_keyboard) > 1
else [
[
InlineKeyboardButton(
text=str(app._("declined", "button", locale=user_locale)),
callback_data="nothing",
)
]
]
)
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
app._(
"submission_rejected",
"console",
locale=app.config["locale_log"],
).format(fullclb[2]),
)
@Client.on_callback_query(filters.regex("sub_block_[\s\S]*"))
async def callback_query_block(app: PyroClient, clb: CallbackQuery):
fullclb = str(clb.data).split("_")
user_locale = clb.from_user.language_code
await app.send_message(
int(fullclb[2]),
app._("sub_blocked", "message"),
)
PosterUser(int(fullclb[2])).block()
await clb.answer(
text=app._("sub_block", "callback", locale=user_locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = [
clb.message.reply_markup.inline_keyboard[0],
[
InlineKeyboardButton(
text=str(app._("sub_unblock", "button", locale=user_locale)),
callback_data=f"sub_unblock_{fullclb[2]}",
)
],
]
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
app._(
"user_blocked",
"console",
locale=app.config["locale_log"],
).format(fullclb[2]),
)
@Client.on_callback_query(filters.regex("sub_unblock_[\s\S]*"))
async def callback_query_unblock(app: PyroClient, clb: CallbackQuery):
fullclb = str(clb.data).split("_")
user_locale = clb.from_user.language_code
await app.send_message(int(fullclb[2]), app._("sub_unblocked", "message"))
PosterUser(int(fullclb[2])).unblock()
await clb.answer(
text=app._("sub_unblock", "callback", locale=user_locale).format(fullclb[2]),
show_alert=True,
)
edited_markup = [
clb.message.reply_markup.inline_keyboard[0],
[
InlineKeyboardButton(
text=str(app._("sub_block", "button", locale=user_locale)),
callback_data=f"sub_block_{fullclb[2]}",
)
],
]
await clb.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
app._(
"user_unblocked",
"console",
locale=app.config["locale_log"],
).format(fullclb[2]),
)