This repository has been archived on 2024-10-15. You can view files and clone it, but cannot push or open issues or pull requests.
PyrogramBotBase/classes/pyroclient.py
2023-05-26 16:32:56 +02:00

248 lines
8.6 KiB
Python

import logging
from datetime import datetime, timedelta
from os import getpid
from time import time
from typing import List, Union
import pyrogram
from libbot import config_get
from libbot.i18n import in_every_locale
from libbot.i18n.sync import _
from pyrogram.client import Client
from pyrogram.errors import BadRequest
from pyrogram.handlers.message_handler import MessageHandler
from pyrogram.raw.all import layer
from pyrogram.types import (
BotCommand,
BotCommandScopeAllChatAdministrators,
BotCommandScopeAllGroupChats,
BotCommandScopeAllPrivateChats,
BotCommandScopeChat,
BotCommandScopeChatAdministrators,
BotCommandScopeChatMember,
BotCommandScopeDefault,
)
from ujson import dumps, loads
from classes.commandset import CommandSet
from classes.pyrocommand import PyroCommand
from modules.scheduler import scheduler
logger = logging.getLogger(__name__)
class PyroClient(Client):
def __init__(self):
with open("config.json", "r", encoding="utf-8") as f:
config = loads(f.read())
super().__init__(
name="bot_client",
api_id=config["bot"]["api_id"],
api_hash=config["bot"]["api_hash"],
bot_token=config["bot"]["bot_token"],
workers=config["bot"]["workers"],
plugins=dict(root="plugins", exclude=config["disabled_plugins"]),
sleep_threshold=120,
)
self.commands: List[PyroCommand] = []
self.scoped_commands = config["bot"]["scoped_commands"]
self.start_time = 0
async def start(self):
await super().start()
self.start_time = time()
logger.info(
"Bot is running with Pyrogram v%s (Layer %s) and has started as @%s on PID %s.",
pyrogram.__version__,
layer,
self.me.username,
getpid(),
)
try:
await self.send_message(
chat_id=await config_get("chat_id", "reports"),
text=f"Bot started PID `{getpid()}`",
)
scheduler.add_job(
self.register_commands,
trigger="date",
run_date=datetime.now() + timedelta(seconds=5),
kwargs={"command_sets": await self.collect_commands()},
)
scheduler.start()
except BadRequest:
logger.warning("Unable to send message to report chat.")
async def stop(self):
try:
await self.send_message(
chat_id=await config_get("chat_id", "reports"),
text=f"Bot stopped with PID `{getpid()}`",
)
except BadRequest:
logger.warning("Unable to send message to report chat.")
await super().stop()
logger.warning("Bot stopped with PID %s.", getpid())
async def collect_commands(self) -> Union[List[CommandSet], None]:
"""Gather list of the bot's commands
### Returns:
* `List[CommandSet]`: List of the commands' sets
"""
command_sets = None
# If config get bot.scoped_commands is true - more complicated
# scopes system will be used instead of simple global commands
if self.scoped_commands:
scopes = {}
command_sets = []
# Iterate through all commands in config
for command, contents in (await config_get("commands")).items():
# Iterate through all scopes of a command
for scope in contents["scopes"]:
if dumps(scope) not in scopes:
scopes[dumps(scope)] = {"_": []}
# Add command to the scope's flattened key in scopes dict
scopes[dumps(scope)]["_"].append(
BotCommand(command, _(command, "commands"))
)
for locale, string in (
await in_every_locale(command, "commands")
).items():
if locale not in scopes[dumps(scope)]:
scopes[dumps(scope)][locale] = []
scopes[dumps(scope)][locale].append(BotCommand(command, string))
# Iterate through all scopes and its commands
for scope, locales in scopes.items():
# Make flat key a dict again
scope_dict = loads(scope)
# Create object with the same name and args from the dict
try:
scope_obj = globals()[scope_dict["name"]](
**{
key: value
for key, value in scope_dict.items()
if key != "name"
}
)
except NameError:
logger.error(
"Could not register commands of the scope '%s' due to an invalid scope class provided!",
scope_dict["name"],
)
continue
except TypeError:
logger.error(
"Could not register commands of the scope '%s' due to an invalid class arguments provided!",
scope_dict["name"],
)
continue
# Add set of commands to the list of the command sets
for locale, commands in locales.items():
if locale == "_":
command_sets.append(
CommandSet(commands, scope=scope_obj, language_code="")
)
continue
command_sets.append(
CommandSet(commands, scope=scope_obj, language_code=locale)
)
logger.info("Registering the following command sets: %s", command_sets)
else:
# This part here looks into the handlers and looks for commands
# in it, if there are any. Then adds them to self.commands
for handler in self.dispatcher.groups[0]:
if isinstance(handler, MessageHandler):
for entry in [handler.filters.base, handler.filters.other]:
if hasattr(entry, "commands"):
for command in entry.commands:
logger.info("I see a command %s in my filters", command)
self.add_command(command)
return command_sets
def add_command(
self,
command: str,
):
"""Add command to the bot's internal commands list
### Args:
* command (`str`)
"""
self.commands.append(
PyroCommand(
command,
_(command, "commands"),
)
)
logger.info(
"Added command '%s' to the bot's internal commands list",
command,
)
async def register_commands(
self, command_sets: Union[List[CommandSet], None] = None
):
"""Register commands stored in bot's 'commands' attribute"""
if command_sets is None:
commands = [
BotCommand(command=command.command, description=command.description)
for command in self.commands
]
logger.info(
"Registering commands %s with a default scope 'BotCommandScopeDefault'"
)
await self.set_bot_commands(commands)
return
for command_set in command_sets:
await self.set_bot_commands(
command_set.commands,
command_set.scope,
language_code=command_set.language_code,
)
logger.info(
"Registering command set with commands %s and scope '%s'",
command_set.commands,
command_set.scope,
)
async def remove_commands(self, command_sets: Union[List[CommandSet], None] = None):
"""Remove commands stored in bot's 'commands' attribute"""
if command_sets is None:
logger.info(
"Removing commands with a default scope 'BotCommandScopeDefault'"
)
await self.delete_bot_commands(BotCommandScopeDefault())
return
for command_set in command_sets:
logger.info(
"Removing command set with scope '%s'",
command_set.scope,
)
await self.delete_bot_commands(
command_set.scope,
language_code=command_set.language_code,
)