This repository has been archived on 2024-08-21. You can view files and clone it, but cannot push or open issues or pull requests.
TelegramPoster/plugins/callbacks/submission.py

277 lines
8.2 KiB
Python

import logging
from os import path
from pathlib import Path
from shutil import rmtree
from bson import ObjectId
from libbot import config_get
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.types import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup
from classes.exceptions import (
SubmissionDuplicatesError,
SubmissionUnavailableError,
SubmissionUnsupportedError,
)
from classes.pyroclient import PyroClient
from modules.database import col_submitted
logger = logging.getLogger(__name__)
@Client.on_callback_query(filters.regex("sub_yes_[\s\S]*"))
async def callback_query_yes(app: PyroClient, callback: CallbackQuery):
user = await app.find_user(callback.from_user)
fullcallback = str(callback.data).split("_")
db_entry = await col_submitted.find_one({"_id": ObjectId(fullcallback[2])})
try:
submission = await app.submit_media(
fullcallback[2],
purge_caption=("caption" not in fullcallback),
)
except SubmissionUnavailableError:
await callback.answer(
text=app._("sub_msg_unavail", "callback", locale=user.locale),
show_alert=True,
)
return
except SubmissionUnsupportedError:
await callback.answer(
text=app._("mime_not_allowed", "message", locale=user.locale).format(
", ".join(app.config["submission"]["mime_types"]), quote=True
),
show_alert=True,
)
return
except SubmissionDuplicatesError as exc:
await callback.answer(
text=app._("sub_duplicates_found", "callback", locale=user.locale),
show_alert=True,
)
await callback.message.reply_text(
app._("sub_media_duplicates_list", "message", locale=user.locale).format(
"\n".join(exc.duplicates)
),
quote=True,
)
logger.info(
"Submission with ID '%s' could not be accepted because of the duplicates: %s",
fullcallback[2],
str(exc.duplicates),
)
return
if submission[0] is not None:
await submission[0].reply_text(
app._(
"sub_yes",
"message",
locale=(await app.find_user(submission[0].from_user)).locale,
),
quote=True,
)
elif db_entry is not None:
await app.send_message(
db_entry["user"],
app._(
"sub_yes",
"message",
locale=(await app.find_user(db_entry["user"])).locale,
),
)
await callback.answer(
text=app._("sub_yes", "callback", locale=user.locale).format(fullcallback[2]),
show_alert=True,
)
edited_markup = (
[
[
InlineKeyboardButton(
text=str(app._("accepted", "button", locale=user.locale)),
callback_data="nothing",
)
],
callback.message.reply_markup.inline_keyboard[1],
]
if len(callback.message.reply_markup.inline_keyboard) > 1
else [
[
InlineKeyboardButton(
text=str(app._("accepted", "button", locale=user.locale)),
callback_data="nothing",
)
]
]
)
if await config_get("send_uploaded_id", "submission"):
await callback.message.edit_caption(
f"{callback.message.caption}\n\nID: `{submission[1]}`"
)
await callback.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
"Submission with ID '%s' accepted and uploaded with ID '%s'",
fullcallback[2],
submission[1],
)
logger.info(
"Submission with ID '%s' accepted and uploaded with ID '%s'",
fullcallback[2],
submission[1],
)
@Client.on_callback_query(filters.regex("sub_no_[\s\S]*"))
async def callback_query_no(app: PyroClient, callback: CallbackQuery):
user = await app.find_user(callback.from_user)
fullcallback = str(callback.data).split("_")
db_entry = await col_submitted.delete_one({"_id": ObjectId(fullcallback[2])})
if (
db_entry.raw_result["temp"]["uuid"] is not None
and Path(
f"{app.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}"
).exists()
):
rmtree(
Path(
f"{app.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}"
),
ignore_errors=True,
)
try:
submission = await app.get_messages(
db_entry["user"], db_entry["telegram"]["msg_id"]
)
except Exception as exc:
await callback.answer(
text=app._("sub_msg_unavail", "message", locale=user.locale),
show_alert=True,
)
return
await submission.reply_text(
app._(
"sub_no",
"message",
locale=(await app.find_user(submission.from_user)).locale,
),
quote=True,
)
await callback.answer(
text=app._("sub_no", "callback", locale=user.locale).format(fullcallback[2]),
show_alert=True,
)
edited_markup = (
[
[
InlineKeyboardButton(
text=str(app._("declined", "button", locale=user.locale)),
callback_data="nothing",
)
],
callback.message.reply_markup.inline_keyboard[1],
]
if len(callback.message.reply_markup.inline_keyboard) > 1
else [
[
InlineKeyboardButton(
text=str(app._("declined", "button", locale=user.locale)),
callback_data="nothing",
)
]
]
)
await callback.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info(
"Submission with ID '%s' rejected",
fullcallback[2],
)
@Client.on_callback_query(filters.regex("sub_block_[\s\S]*"))
async def callback_query_block(app: PyroClient, callback: CallbackQuery):
user = await app.find_user(callback.from_user)
fullcallback = str(callback.data).split("_")
await app.send_message(
int(fullcallback[2]),
app._(
"sub_blocked",
"message",
locale=(await app.find_user(int(fullcallback[2]))).locale,
),
)
await user.block()
await callback.answer(
text=app._("sub_block", "callback", locale=user.locale).format(fullcallback[2]),
show_alert=True,
)
edited_markup = [
callback.message.reply_markup.inline_keyboard[0],
[
InlineKeyboardButton(
text=str(app._("sub_unblock", "button", locale=user.locale)),
callback_data=f"sub_unblock_{fullcallback[2]}",
)
],
]
await callback.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info("User %s has been blocked", fullcallback[2])
@Client.on_callback_query(filters.regex("sub_unblock_[\s\S]*"))
async def callback_query_unblock(app: PyroClient, callback: CallbackQuery):
user = await app.find_user(callback.from_user)
fullcallback = str(callback.data).split("_")
await app.send_message(
int(fullcallback[2]),
app._(
"sub_unblocked",
"message",
locale=(await app.find_user(int(fullcallback[2]))).locale,
),
)
await user.unblock()
await callback.answer(
text=app._("sub_unblock", "callback", locale=user.locale).format(
fullcallback[2]
),
show_alert=True,
)
edited_markup = [
callback.message.reply_markup.inline_keyboard[0],
[
InlineKeyboardButton(
text=str(app._("sub_block", "button", locale=user.locale)),
callback_data=f"sub_block_{fullcallback[2]}",
)
],
]
await callback.message.edit_reply_markup(
reply_markup=InlineKeyboardMarkup(edited_markup)
)
logger.info("User %s has been unblocked", fullcallback[2])