import logging from datetime import datetime, timedelta from os import getpid from time import time from typing import List, Union import pyrogram from libbot import config_get from libbot.i18n import in_every_locale from libbot.i18n.sync import _ from pyrogram.client import Client from pyrogram.errors import BadRequest from pyrogram.handlers.message_handler import MessageHandler from pyrogram.raw.all import layer from pyrogram.types import ( BotCommand, BotCommandScopeAllChatAdministrators, BotCommandScopeAllGroupChats, BotCommandScopeAllPrivateChats, BotCommandScopeChat, BotCommandScopeChatAdministrators, BotCommandScopeChatMember, BotCommandScopeDefault, ) from ujson import dumps, loads from classes.commandset import CommandSet from classes.pyrocommand import PyroCommand from modules.scheduler import scheduler logger = logging.getLogger(__name__) class PyroClient(Client): def __init__(self): with open("config.json", "r", encoding="utf-8") as f: config = loads(f.read()) super().__init__( name="bot_client", api_id=config["bot"]["api_id"], api_hash=config["bot"]["api_hash"], bot_token=config["bot"]["bot_token"], workers=config["bot"]["workers"], plugins=dict(root="plugins", exclude=config["disabled_plugins"]), sleep_threshold=120, ) self.owner = config["bot"]["owner"] self.commands: List[PyroCommand] = [] self.scoped_commands = config["bot"]["scoped_commands"] self.start_time = 0 async def start(self): await super().start() self.start_time = time() logger.info( "Bot is running with Pyrogram v%s (Layer %s) and has started as @%s on PID %s.", pyrogram.__version__, layer, self.me.username, getpid(), ) try: await self.send_message( chat_id=await config_get("chat_id", "reports"), text=f"Bot started PID `{getpid()}`", ) scheduler.add_job( self.register_commands, trigger="date", run_date=datetime.now() + timedelta(seconds=5), kwargs={"command_sets": await self.collect_commands()}, ) scheduler.start() except BadRequest: logger.warning("Unable to send message to report chat.") async def stop(self): try: await self.send_message( chat_id=await config_get("chat_id", "reports"), text=f"Bot stopped with PID `{getpid()}`", ) except BadRequest: logger.warning("Unable to send message to report chat.") await super().stop() logger.warning("Bot stopped with PID %s.", getpid()) async def collect_commands(self) -> Union[List[CommandSet], None]: """Gather list of the bot's commands ### Returns: * `List[CommandSet]`: List of the commands' sets """ command_sets = None # If config get bot.scoped_commands is true - more complicated # scopes system will be used instead of simple global commands if self.scoped_commands: scopes = {} command_sets = [] # Iterate through all commands in config for command, contents in (await config_get("commands")).items(): # Iterate through all scopes of a command for scope in contents["scopes"]: if dumps(scope) not in scopes: scopes[dumps(scope)] = {"_": []} # Add command to the scope's flattened key in scopes dict scopes[dumps(scope)]["_"].append( BotCommand(command, _(command, "commands")) ) for locale, string in ( await in_every_locale(command, "commands") ).items(): if locale not in scopes[dumps(scope)]: scopes[dumps(scope)][locale] = [] scopes[dumps(scope)][locale].append(BotCommand(command, string)) # Iterate through all scopes and its commands for scope, locales in scopes.items(): # Make flat key a dict again scope_dict = loads(scope) # Create object with the same name and args from the dict try: scope_obj = globals()[scope_dict["name"]]( **{ key: value for key, value in scope_dict.items() if key != "name" } ) except NameError: logger.error( "Could not register commands of the scope '%s' due to an invalid scope class provided!", scope_dict["name"], ) continue except TypeError: logger.error( "Could not register commands of the scope '%s' due to an invalid class arguments provided!", scope_dict["name"], ) continue # Add set of commands to the list of the command sets for locale, commands in locales.items(): if locale == "_": command_sets.append( CommandSet(commands, scope=scope_obj, language_code="") ) continue command_sets.append( CommandSet(commands, scope=scope_obj, language_code=locale) ) logger.info("Registering the following command sets: %s", command_sets) else: # This part here looks into the handlers and looks for commands # in it, if there are any. Then adds them to self.commands for handler in self.dispatcher.groups[0]: if isinstance(handler, MessageHandler): for entry in [handler.filters.base, handler.filters.other]: if hasattr(entry, "commands"): for command in entry.commands: logger.info("I see a command %s in my filters", command) self.add_command(command) return command_sets def add_command( self, command: str, ): """Add command to the bot's internal commands list ### Args: * command (`str`) """ self.commands.append( PyroCommand( command, _(command, "commands"), ) ) logger.info( "Added command '%s' to the bot's internal commands list", command, ) async def register_commands( self, command_sets: Union[List[CommandSet], None] = None ): """Register commands stored in bot's 'commands' attribute""" if command_sets is None: commands = [ BotCommand(command=command.command, description=command.description) for command in self.commands ] logger.info( "Registering commands %s with a default scope 'BotCommandScopeDefault'" ) await self.set_bot_commands(commands) return for command_set in command_sets: logger.info( "Registering command set with commands %s and scope '%s' (%s)", command_set.commands, command_set.scope, command_set.language_code, ) await self.set_bot_commands( command_set.commands, command_set.scope, language_code=command_set.language_code, ) async def remove_commands(self, command_sets: Union[List[CommandSet], None] = None): """Remove commands stored in bot's 'commands' attribute""" if command_sets is None: logger.info( "Removing commands with a default scope 'BotCommandScopeDefault'" ) await self.delete_bot_commands(BotCommandScopeDefault()) return for command_set in command_sets: logger.info( "Removing command set with scope '%s' (%s)", command_set.scope, command_set.language_code, ) await self.delete_bot_commands( command_set.scope, language_code=command_set.language_code, )