import logging from datetime import datetime from os import makedirs, path, sep from pathlib import Path from traceback import format_exc from uuid import uuid4 from pyrogram import filters from pyrogram.client import Client from pyrogram.enums.chat_action import ChatAction from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message from classes.enums.submission_types import SubmissionType from classes.exceptions import SubmissionDuplicatesError, SubmissionUnsupportedError from classes.pyroclient import PyroClient from modules import custom_filters from modules.database import col_submitted from modules.utils import USERS_WITH_CONTEXT logger = logging.getLogger(__name__) @Client.on_message( custom_filters.mode_submit & ~filters.scheduled & filters.private & filters.photo | filters.video # | filters.animation | filters.document ) async def get_submission(app: PyroClient, message: Message): global USERS_WITH_CONTEXT if not hasattr(message.from_user, "id"): return if message.from_user.id in USERS_WITH_CONTEXT: return user = await app.find_user(message.from_user) user_owner = await app.find_user(app.owner) try: if user.banned: return await app.send_chat_action(message.chat.id, ChatAction.TYPING) save_tmp = True contents = None if await user.is_limited(): await message.reply_text( app._("sub_cooldown", "message", locale=user.locale).format( app.config["submission"]["timeout"] ) ) return if message.document is not None: logger.info( "User %s is trying to submit a file of type '%s' with name '%s' and size of %s MB", message.from_user.id, message.document.mime_type, message.document.file_name, message.document.file_size / 1024 / 1024, ) if message.document.mime_type not in app.config["submission"]["mime_types"]: await message.reply_text( app._("mime_not_allowed", "message", locale=user.locale).format( ", ".join(app.config["submission"]["mime_types"]) ), quote=True, ) return if message.document.file_size > app.config["submission"]["file_size"]: await message.reply_text( app._("document_too_large", "message", locale=user.locale).format( app.config["submission"]["file_size"] / 1024 / 1024 ), quote=True, ) return if message.document.file_size > app.config["submission"]["tmp_size"]: save_tmp = False contents = ( message.document.file_id, SubmissionType.DOCUMENT, ) # , message.document.file_name if message.video is not None: logger.info( "User %s is trying to submit a video with name '%s' and size of %s MB", message.from_user.id, message.video.file_name, message.video.file_size / 1024 / 1024, ) if message.video.file_size > app.config["submission"]["file_size"]: await message.reply_text( app._("document_too_large", "message", locale=user.locale).format( app.config["submission"]["file_size"] / 1024 / 1024 ), quote=True, ) return if message.video.file_size > app.config["submission"]["tmp_size"]: save_tmp = False contents = ( message.video.file_id, SubmissionType.VIDEO, ) # , message.video.file_name # if message.animation is not None: # logger.info( # "User %s is trying to submit an animation with name '%s' and size of %s MB", # message.from_user.id, # message.animation.file_name, # message.animation.file_size / 1024 / 1024, # ) # if message.animation.file_size > app.config["submission"]["file_size"]: # await message.reply_text( # app._("document_too_large", "message", locale=user.locale).format( # str(app.config["submission"]["file_size"] / 1024 / 1024) # ), # quote=True, # ) # return # if message.animation.file_size > app.config["submission"]["tmp_size"]: # save_tmp = False # contents = ( # message.animation.file_id, # SubmissionType.ANIMATION, # ) # , message.animation.file_name if message.photo is not None: logger.info( "User %s is trying to submit a photo with ID '%s' and size of %s MB", message.from_user.id, message.photo.file_id, message.photo.file_size / 1024 / 1024, ) contents = ( message.photo.file_id, SubmissionType.PHOTO, ) # , "please_generate" if contents is None: return if save_tmp is not None: tmp_id = str(uuid4()) # filename = tmp_id if contents[1] == "please_generate" else contents[1] makedirs( Path(f"{app.config['locations']['data']}/submissions/{tmp_id}"), exist_ok=True, ) downloaded = await app.download_media( message, str(Path(f"{app.config['locations']['data']}/submissions/{tmp_id}")) + sep, ) inserted = await col_submitted.insert_one( { "user": message.from_user.id, "date": datetime.now(), "done": False, "type": contents[1].value, "temp": {"uuid": tmp_id, "file": path.basename(str(downloaded))}, "telegram": {"msg_id": message.id, "file_id": contents[0]}, "caption": str(message.caption) if message.caption is not None else None, } ) else: inserted = await col_submitted.insert_one( { "user": message.from_user.id, "date": datetime.now(), "done": False, "type": contents[1].value, "temp": {"uuid": None, "file": None}, "telegram": {"msg_id": message.id, "file_id": contents[0]}, "caption": str(message.caption) if message.caption is not None else None, } ) buttons = [ [ InlineKeyboardButton( text=app._("sub_yes", "button", locale=user_owner.locale), callback_data=f"sub_yes_{str(inserted.inserted_id)}", ) ] ] if message.caption is not None: caption = str(message.caption) buttons[0].append( InlineKeyboardButton( text=app._("sub_yes_caption", "button", locale=user_owner.locale), callback_data=f"sub_yes_{str(inserted.inserted_id)}_caption", ) ) else: caption = "" buttons[0].append( InlineKeyboardButton( text=app._("sub_no", "button", locale=user_owner.locale), callback_data=f"sub_no_{str(inserted.inserted_id)}", ) ) caption += app._("sub_by", "message", locale=user_owner.locale) if message.from_user.first_name is not None: caption += f" {message.from_user.first_name}" if message.from_user.last_name is not None: caption += f" {message.from_user.last_name}" if message.from_user.username is not None: caption += f" (@{message.from_user.username})" if message.from_user.phone_number is not None: caption += f" ({message.from_user.phone_number})" if ( message.from_user.id in app.admins and app.config["submission"]["require_confirmation"]["admins"] is False ): try: submitted = await app.submit_media(str(inserted.inserted_id)) await message.reply_text( app._("sub_yes_auto", "message", locale=user.locale), disable_notification=True, quote=True, ) if app.config["submission"]["send_uploaded_id"]: caption += f"\n\nID: `{submitted[1]}`" await message.copy( app.owner, caption=caption, disable_notification=True ) return except SubmissionUnsupportedError: await message.reply_text( app._("mime_not_allowed", "message", locale=user.locale).format( ", ".join(app.config["submission"]["mime_types"]), quote=True ), quote=True, ) return except SubmissionDuplicatesError as exc: await message.reply_text( app._( "sub_media_duplicates_list", "message", locale=user.locale ).format("\n • ".join(exc.duplicates)), quote=True, ) return except Exception as exc: await message.reply_text(exc, quote=True) return elif ( message.from_user.id not in app.admins and app.config["submission"]["require_confirmation"]["users"] is False ): try: submitted = await app.submit_photo(str(inserted.inserted_id)) await message.reply_text( app._("sub_yes_auto", "message", locale=user.locale), disable_notification=True, quote=True, ) if app.config["submission"]["send_uploaded_id"]: caption += f"\n\nID: `{submitted[1]}`" await message.copy(app.owner, caption=caption) return except SubmissionUnsupportedError: await message.reply_text( app._("mime_not_allowed", "message", locale=user.locale).format( ", ".join(app.config["submission"]["mime_types"]), quote=True ) ) return except SubmissionDuplicatesError as exc: await message.reply_text( app._("sub_dup", "message", locale=user.locale), quote=True ) return except Exception as exc: await app.send_message( app.owner, app._( "sub_error_admin", "message", locale=user_owner.locale ).format(message.from_user.id, format_exc()), ) await message.reply_text("sub_error", quote=True) return if message.from_user.id not in app.admins: buttons += [ [ InlineKeyboardButton( text=app._("sub_block", "button", locale=user_owner.locale), callback_data=f"sub_block_{message.from_user.id}", ) ] ] await user.update_cooldown() if message.from_user.id != app.owner: await message.reply_text( app._("sub_sent", "message", locale=user.locale), disable_notification=True, quote=True, ) await message.copy( app.owner, caption=caption, reply_markup=InlineKeyboardMarkup(buttons) ) except AttributeError: logger.error("'from_user' does not seem to contain 'id'")