This repository has been archived on 2024-10-15. You can view files and clone it, but cannot push or open issues or pull requests.
PyrogramBotBase/classes/pyroclient.py

268 lines
9.5 KiB
Python
Raw Normal View History

2023-05-26 17:32:56 +03:00
import logging
from datetime import datetime, timedelta
from os import getpid
2023-06-21 14:44:26 +03:00
from pathlib import Path
2023-05-26 17:32:56 +03:00
from time import time
from typing import List, Union
import pyrogram
2023-06-20 13:49:38 +03:00
from libbot.i18n import BotLocale
2023-05-26 17:32:56 +03:00
from libbot.i18n.sync import _
from pyrogram.client import Client
from pyrogram.errors import BadRequest
from pyrogram.handlers.message_handler import MessageHandler
from pyrogram.raw.all import layer
from pyrogram.types import (
BotCommand,
BotCommandScopeAllChatAdministrators,
BotCommandScopeAllGroupChats,
BotCommandScopeAllPrivateChats,
BotCommandScopeChat,
BotCommandScopeChatAdministrators,
BotCommandScopeChatMember,
BotCommandScopeDefault,
)
from ujson import dumps, loads
from classes.commandset import CommandSet
from classes.pyrocommand import PyroCommand
from modules.scheduler import scheduler
logger = logging.getLogger(__name__)
class PyroClient(Client):
def __init__(self):
with open("config.json", "r", encoding="utf-8") as f:
2023-06-21 14:44:26 +03:00
self.config: dict = loads(f.read())
2023-05-26 17:32:56 +03:00
super().__init__(
name="bot_client",
2023-06-21 14:44:26 +03:00
api_id=self.config["bot"]["api_id"],
api_hash=self.config["bot"]["api_hash"],
bot_token=self.config["bot"]["bot_token"],
2023-06-01 16:53:47 +03:00
# Workers should be commented when using convopyro, otherwise
# handlers land in another event loop and you won't see them
2023-06-21 14:44:26 +03:00
workers=self.config["bot"]["workers"],
plugins=dict(root="plugins", exclude=self.config["disabled_plugins"]),
2023-05-26 17:32:56 +03:00
sleep_threshold=120,
2023-06-21 14:50:38 +03:00
max_concurrent_transmissions=self.config["bot"][
"max_concurrent_transmissions"
],
2023-05-26 17:32:56 +03:00
)
2023-06-21 14:44:26 +03:00
self.owner: int = self.config["bot"]["owner"]
2023-05-26 17:32:56 +03:00
self.commands: List[PyroCommand] = []
2023-06-21 14:44:26 +03:00
self.scoped_commands: bool = self.config["bot"]["scoped_commands"]
self.start_time: float = 0
2023-05-26 17:32:56 +03:00
2023-06-21 14:44:26 +03:00
self.bot_locale: BotLocale = BotLocale(Path(self.config["locations"]["locale"]))
self.default_locale: str = self.bot_locale.default
self.locales: dict = self.bot_locale.locales
2023-06-20 13:49:38 +03:00
self._ = self.bot_locale._
self.in_all_locales = self.bot_locale.in_all_locales
self.in_every_locale = self.bot_locale.in_every_locale
2023-05-26 17:32:56 +03:00
async def start(self):
await super().start()
self.start_time = time()
logger.info(
"Bot is running with Pyrogram v%s (Layer %s) and has started as @%s on PID %s.",
pyrogram.__version__,
layer,
self.me.username,
getpid(),
)
try:
await self.send_message(
2023-06-21 14:44:26 +03:00
chat_id=self.config["reports"]["chat_id"],
2023-05-26 17:32:56 +03:00
text=f"Bot started PID `{getpid()}`",
)
scheduler.add_job(
self.register_commands,
trigger="date",
run_date=datetime.now() + timedelta(seconds=5),
kwargs={"command_sets": await self.collect_commands()},
)
scheduler.start()
except BadRequest:
logger.warning("Unable to send message to report chat.")
async def stop(self):
try:
await self.send_message(
2023-06-21 14:44:26 +03:00
chat_id=self.config["reports"]["chat_id"],
2023-05-26 17:32:56 +03:00
text=f"Bot stopped with PID `{getpid()}`",
)
except BadRequest:
logger.warning("Unable to send message to report chat.")
await super().stop()
logger.warning("Bot stopped with PID %s.", getpid())
async def collect_commands(self) -> Union[List[CommandSet], None]:
"""Gather list of the bot's commands
### Returns:
* `List[CommandSet]`: List of the commands' sets
"""
command_sets = None
2023-06-21 14:44:26 +03:00
# If config's bot.scoped_commands is true - more complicated
2023-05-26 17:32:56 +03:00
# scopes system will be used instead of simple global commands
if self.scoped_commands:
scopes = {}
command_sets = []
# Iterate through all commands in config
2023-06-21 14:44:26 +03:00
for command, contents in self.config["commands"].items():
2023-05-26 17:32:56 +03:00
# Iterate through all scopes of a command
for scope in contents["scopes"]:
if dumps(scope) not in scopes:
scopes[dumps(scope)] = {"_": []}
# Add command to the scope's flattened key in scopes dict
scopes[dumps(scope)]["_"].append(
BotCommand(command, _(command, "commands"))
)
for locale, string in (
2023-06-20 13:49:38 +03:00
self.in_every_locale(command, "commands")
2023-05-26 17:32:56 +03:00
).items():
if locale not in scopes[dumps(scope)]:
scopes[dumps(scope)][locale] = []
scopes[dumps(scope)][locale].append(BotCommand(command, string))
# Iterate through all scopes and its commands
for scope, locales in scopes.items():
# Make flat key a dict again
scope_dict = loads(scope)
2023-06-21 14:45:01 +03:00
# Replace "owner" in the bot scope with owner's id
if "chat_id" in scope_dict and scope_dict["chat_id"] == "owner":
scope_dict["chat_id"] = self.owner
2023-05-26 17:32:56 +03:00
# Create object with the same name and args from the dict
try:
scope_obj = globals()[scope_dict["name"]](
**{
key: value
for key, value in scope_dict.items()
if key != "name"
}
)
except NameError:
logger.error(
"Could not register commands of the scope '%s' due to an invalid scope class provided!",
scope_dict["name"],
)
continue
except TypeError:
logger.error(
"Could not register commands of the scope '%s' due to an invalid class arguments provided!",
scope_dict["name"],
)
continue
# Add set of commands to the list of the command sets
for locale, commands in locales.items():
if locale == "_":
command_sets.append(
CommandSet(commands, scope=scope_obj, language_code="")
)
continue
command_sets.append(
CommandSet(commands, scope=scope_obj, language_code=locale)
)
logger.info("Registering the following command sets: %s", command_sets)
else:
# This part here looks into the handlers and looks for commands
# in it, if there are any. Then adds them to self.commands
for handler in self.dispatcher.groups[0]:
if isinstance(handler, MessageHandler):
for entry in [handler.filters.base, handler.filters.other]:
if hasattr(entry, "commands"):
for command in entry.commands:
logger.info("I see a command %s in my filters", command)
self.add_command(command)
return command_sets
def add_command(
self,
command: str,
):
"""Add command to the bot's internal commands list
### Args:
* command (`str`)
"""
self.commands.append(
PyroCommand(
command,
_(command, "commands"),
)
)
logger.info(
"Added command '%s' to the bot's internal commands list",
command,
)
async def register_commands(
self, command_sets: Union[List[CommandSet], None] = None
):
"""Register commands stored in bot's 'commands' attribute"""
if command_sets is None:
commands = [
BotCommand(command=command.command, description=command.description)
for command in self.commands
]
logger.info(
"Registering commands %s with a default scope 'BotCommandScopeDefault'"
)
await self.set_bot_commands(commands)
return
for command_set in command_sets:
2023-05-28 12:43:45 +03:00
logger.info(
"Registering command set with commands %s and scope '%s' (%s)",
2023-05-26 17:32:56 +03:00
command_set.commands,
command_set.scope,
2023-05-28 12:43:45 +03:00
command_set.language_code,
2023-05-26 17:32:56 +03:00
)
2023-05-28 12:43:45 +03:00
await self.set_bot_commands(
2023-05-26 17:32:56 +03:00
command_set.commands,
command_set.scope,
2023-05-28 12:43:45 +03:00
language_code=command_set.language_code,
2023-05-26 17:32:56 +03:00
)
async def remove_commands(self, command_sets: Union[List[CommandSet], None] = None):
"""Remove commands stored in bot's 'commands' attribute"""
if command_sets is None:
logger.info(
"Removing commands with a default scope 'BotCommandScopeDefault'"
)
await self.delete_bot_commands(BotCommandScopeDefault())
return
for command_set in command_sets:
logger.info(
2023-05-28 12:43:45 +03:00
"Removing command set with scope '%s' (%s)",
2023-05-26 17:32:56 +03:00
command_set.scope,
2023-05-28 12:43:45 +03:00
command_set.language_code,
2023-05-26 17:32:56 +03:00
)
await self.delete_bot_commands(
command_set.scope,
language_code=command_set.language_code,
)