import logging from datetime import datetime from os import makedirs, path, sep from pathlib import Path from traceback import format_exc from uuid import uuid4 from pyrogram import filters from pyrogram.client import Client from pyrogram.enums.chat_action import ChatAction from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message from classes.enums.submission_types import SubmissionType from classes.exceptions import SubmissionDuplicatesError, SubmissionUnsupportedError from classes.pyroclient import PyroClient from modules.database import col_banned, col_submitted from modules.utils import USERS_WITH_CONTEXT logger = logging.getLogger(__name__) @Client.on_message( ~filters.scheduled & filters.private & filters.photo | filters.video # | filters.animation | filters.document ) async def get_submission(app: PyroClient, msg: Message): global USERS_WITH_CONTEXT if not hasattr(msg.from_user, "id"): return if msg.from_user.id in USERS_WITH_CONTEXT: return user = await app.find_user(msg.from_user) user_owner = await app.find_user(app.owner) try: if col_banned.find_one({"user": msg.from_user.id}) is not None: return await app.send_chat_action(msg.chat.id, ChatAction.TYPING) save_tmp = True contents = None if await user.is_limited(): await msg.reply_text( app._("sub_cooldown", "message", locale=user.locale).format( app.config["submission"]["timeout"] ) ) return if msg.document is not None: logger.info( "User %s is trying to submit a file of type '%s' with name '%s' and size of %s MB", msg.from_user.id, msg.document.mime_type, msg.document.file_name, msg.document.file_size / 1024 / 1024, ) if msg.document.mime_type not in app.config["submission"]["mime_types"]: await msg.reply_text( app._("mime_not_allowed", "message", locale=user.locale).format( ", ".join(app.config["submission"]["mime_types"]) ), quote=True, ) return if msg.document.file_size > app.config["submission"]["file_size"]: await msg.reply_text( app._("document_too_large", "message", locale=user.locale).format( app.config["submission"]["file_size"] / 1024 / 1024 ), quote=True, ) return if msg.document.file_size > app.config["submission"]["tmp_size"]: save_tmp = False contents = ( msg.document.file_id, SubmissionType.DOCUMENT, ) # , msg.document.file_name if msg.video is not None: logger.info( "User %s is trying to submit a video with name '%s' and size of %s MB", msg.from_user.id, msg.video.file_name, msg.video.file_size / 1024 / 1024, ) if msg.video.file_size > app.config["submission"]["file_size"]: await msg.reply_text( app._("document_too_large", "message", locale=user.locale).format( app.config["submission"]["file_size"] / 1024 / 1024 ), quote=True, ) return if msg.video.file_size > app.config["submission"]["tmp_size"]: save_tmp = False contents = msg.video.file_id, SubmissionType.VIDEO # , msg.video.file_name # if msg.animation is not None: # logger.info( # "User %s is trying to submit an animation with name '%s' and size of %s MB", # msg.from_user.id, # msg.animation.file_name, # msg.animation.file_size / 1024 / 1024, # ) # if msg.animation.file_size > app.config["submission"]["file_size"]: # await msg.reply_text( # app._("document_too_large", "message", locale=user.locale).format( # str(app.config["submission"]["file_size"] / 1024 / 1024) # ), # quote=True, # ) # return # if msg.animation.file_size > app.config["submission"]["tmp_size"]: # save_tmp = False # contents = ( # msg.animation.file_id, # SubmissionType.ANIMATION, # ) # , msg.animation.file_name if msg.photo is not None: logger.info( "User %s is trying to submit a photo with ID '%s' and size of %s MB", msg.from_user.id, msg.photo.file_id, msg.photo.file_size / 1024 / 1024, ) contents = msg.photo.file_id, SubmissionType.PHOTO # , "please_generate" if contents is None: return if save_tmp is not None: tmp_id = str(uuid4()) # filename = tmp_id if contents[1] == "please_generate" else contents[1] makedirs( Path(f"{app.config['locations']['data']}/submissions/{tmp_id}"), exist_ok=True, ) downloaded = await app.download_media( msg, str(Path(f"{app.config['locations']['data']}/submissions/{tmp_id}")) + sep, ) inserted = col_submitted.insert_one( { "user": msg.from_user.id, "date": datetime.now(), "done": False, "type": contents[1].value, "temp": {"uuid": tmp_id, "file": path.basename(str(downloaded))}, "telegram": {"msg_id": msg.id, "file_id": contents[0]}, "caption": str(msg.caption) if msg.caption is not None else None, } ) else: inserted = col_submitted.insert_one( { "user": msg.from_user.id, "date": datetime.now(), "done": False, "type": contents[1].value, "temp": {"uuid": None, "file": None}, "telegram": {"msg_id": msg.id, "file_id": contents[0]}, "caption": str(msg.caption) if msg.caption is not None else None, } ) buttons = [ [ InlineKeyboardButton( text=app._("sub_yes", "button", locale=user_owner.locale), callback_data=f"sub_yes_{str(inserted.inserted_id)}", ) ] ] if msg.caption is not None: caption = str(msg.caption) buttons[0].append( InlineKeyboardButton( text=app._("sub_yes_caption", "button", locale=user_owner.locale), callback_data=f"sub_yes_{str(inserted.inserted_id)}_caption", ) ) else: caption = "" buttons[0].append( InlineKeyboardButton( text=app._("sub_no", "button", locale=user_owner.locale), callback_data=f"sub_no_{str(inserted.inserted_id)}", ) ) caption += app._("sub_by", "message", locale=user_owner.locale) if msg.from_user.first_name is not None: caption += f" {msg.from_user.first_name}" if msg.from_user.last_name is not None: caption += f" {msg.from_user.last_name}" if msg.from_user.username is not None: caption += f" (@{msg.from_user.username})" if msg.from_user.phone_number is not None: caption += f" ({msg.from_user.phone_number})" if ( msg.from_user.id in app.admins and app.config["submission"]["require_confirmation"]["admins"] is False ): try: submitted = await app.submit_media(str(inserted.inserted_id)) await msg.reply_text( app._("sub_yes_auto", "message", locale=user.locale), disable_notification=True, quote=True, ) if app.config["submission"]["send_uploaded_id"]: caption += f"\n\nID: `{submitted[1]}`" await msg.copy(app.owner, caption=caption, disable_notification=True) return except SubmissionUnsupportedError: await msg.reply_text( app._("mime_not_allowed", "message", locale=user.locale).format( ", ".join(app.config["submission"]["mime_types"]), quote=True ), quote=True, ) return except SubmissionDuplicatesError as exp: await msg.reply_text( app._( "sub_media_duplicates_list", "message", locale=user.locale ).format("\n • ".join(exp.duplicates)), quote=True, ) return except Exception as exp: await msg.reply_text(format_exc(), quote=True) return elif ( msg.from_user.id not in app.admins and app.config["submission"]["require_confirmation"]["users"] is False ): try: submitted = await app.submit_photo(str(inserted.inserted_id)) await msg.reply_text( app._("sub_yes_auto", "message", locale=user.locale), disable_notification=True, quote=True, ) if app.config["submission"]["send_uploaded_id"]: caption += f"\n\nID: `{submitted[1]}`" await msg.copy(app.owner, caption=caption) return except SubmissionUnsupportedError: await msg.reply_text( app._("mime_not_allowed", "message", locale=user.locale).format( ", ".join(app.config["submission"]["mime_types"]), quote=True ) ) return except SubmissionDuplicatesError as exp: await msg.reply_text( app._("sub_dup", "message", locale=user.locale), quote=True ) return except Exception as exp: await app.send_message( app.owner, app._( "sub_error_admin", "message", locale=user_owner.locale ).format(msg.from_user.id, format_exc()), ) await msg.reply_text("sub_error", quote=True) return if msg.from_user.id not in app.admins: buttons += [ [ InlineKeyboardButton( text=app._("sub_block", "button", locale=user_owner.locale), callback_data=f"sub_block_{msg.from_user.id}", ) ] ] await user.update_cooldown() if msg.from_user.id != app.owner: await msg.reply_text( app._("sub_sent", "message", locale=user.locale), disable_notification=True, quote=True, ) await msg.copy( app.owner, caption=caption, reply_markup=InlineKeyboardMarkup(buttons) ) except AttributeError: logger.error("'from_user' does not seem to contain 'id'")