Updated to 0.2.1
This commit is contained in:
6
src/libbot/__init__.py
Normal file
6
src/libbot/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
__version__ = "0.2.1"
|
||||
__license__ = "GPL3"
|
||||
__author__ = "Profitroll"
|
||||
|
||||
from . import i18n, pyrogram, sync
|
||||
from .__main__ import *
|
95
src/libbot/__main__.py
Normal file
95
src/libbot/__main__.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
|
||||
import aiofiles
|
||||
|
||||
try:
|
||||
from ujson import dumps, loads
|
||||
except ImportError:
|
||||
from json import dumps, loads
|
||||
|
||||
from libbot.sync import nested_set
|
||||
|
||||
|
||||
async def json_read(path: Union[str, Path]) -> Any:
|
||||
"""Read contents of a JSON file
|
||||
|
||||
### Args:
|
||||
* path (`Union[str, Path]`): Path-like object or path as a string
|
||||
|
||||
### Returns:
|
||||
* `Any`: File contents
|
||||
"""
|
||||
async with aiofiles.open(str(path), mode="r", encoding="utf-8") as f:
|
||||
data = await f.read()
|
||||
return loads(data)
|
||||
|
||||
|
||||
async def json_write(data: Any, path: Union[str, Path]) -> None:
|
||||
"""Write contents to a JSON file
|
||||
|
||||
### Args:
|
||||
* data (`Any`): Contents to write. Must be a JSON serializable
|
||||
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
|
||||
"""
|
||||
async with aiofiles.open(str(path), mode="w", encoding="utf-8") as f:
|
||||
await f.write(
|
||||
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
|
||||
if hasattr(dumps, "escape_forward_slashes")
|
||||
else dumps(data, ensure_ascii=False, indent=4)
|
||||
)
|
||||
|
||||
|
||||
async def config_get(
|
||||
key: str, *path: str, config_file: Union[str, Path] = "config.json"
|
||||
) -> Any:
|
||||
"""Get a value of the config key by its path provided
|
||||
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
|
||||
|
||||
### Args:
|
||||
* key (`str`): Key that contains the value
|
||||
* *path (`str`): Path to the key that contains the value
|
||||
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
|
||||
|
||||
### Returns:
|
||||
* `Any`: Key's value
|
||||
|
||||
### Example:
|
||||
Get the "salary" of "Pete" from this JSON structure:
|
||||
```json
|
||||
{
|
||||
"users": {
|
||||
"Pete": {
|
||||
"salary": 10.0
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This can be easily done with the following code:
|
||||
```python
|
||||
import libbot
|
||||
salary = await libbot.config_get("salary", "users", "Pete")
|
||||
```
|
||||
"""
|
||||
this_key = await json_read(config_file)
|
||||
for dict_key in path:
|
||||
this_key = this_key[dict_key]
|
||||
return this_key[key]
|
||||
|
||||
|
||||
async def config_set(
|
||||
key: str, value: Any, *path: str, config_file: Union[str, Path] = "config.json"
|
||||
) -> None:
|
||||
"""Set config's key by its path to the value
|
||||
|
||||
### Args:
|
||||
* key (`str`): Key that leads to the value
|
||||
* value (`Any`): Any JSON serializable data
|
||||
* *path (`str`): Path to the key of the target
|
||||
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
|
||||
"""
|
||||
await json_write(
|
||||
nested_set(await json_read(config_file), value, *(*path, key)), config_file
|
||||
)
|
||||
return
|
118
src/libbot/i18n/__init__.py
Normal file
118
src/libbot/i18n/__init__.py
Normal file
@@ -0,0 +1,118 @@
|
||||
from os import listdir
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
import libbot
|
||||
from libbot.i18n import sync
|
||||
from libbot.i18n.classes.bot_locale import BotLocale
|
||||
|
||||
|
||||
async def _(
|
||||
key: str,
|
||||
*args: str,
|
||||
locale: Union[str, None] = "en",
|
||||
locales_root: Union[str, Path] = Path("locale"),
|
||||
) -> Any:
|
||||
"""Get value of locale string
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locale (`Union[str, None]`): Locale to looked up in. Defaults to `"en"`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
|
||||
"""
|
||||
locale = libbot.sync.config_get("locale") if locale is None else locale
|
||||
|
||||
try:
|
||||
this_dict = await libbot.json_read(Path(f"{locales_root}/{locale}.json"))
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
this_dict = await libbot.json_read(
|
||||
Path(f'{locales_root}/{await libbot.config_get("locale")}.json')
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
return this_key[key]
|
||||
except KeyError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
|
||||
async def in_all_locales(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> list:
|
||||
"""Get value of the provided key and path in all available locales
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `list`: List of values in all locales
|
||||
"""
|
||||
|
||||
output = []
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = await libbot.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
output.append(this_key[key])
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return output
|
||||
|
||||
|
||||
async def in_every_locale(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> Dict[str, Any]:
|
||||
"""Get value of the provided key and path in every available locale with locale tag
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
|
||||
"""
|
||||
|
||||
output = {}
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = await libbot.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
output[lc] = this_key[key]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return output
|
125
src/libbot/i18n/classes/bot_locale.py
Normal file
125
src/libbot/i18n/classes/bot_locale.py
Normal file
@@ -0,0 +1,125 @@
|
||||
from os import listdir
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from libbot import sync
|
||||
|
||||
|
||||
class BotLocale:
|
||||
"""Small addon that can be used by bot clients' classes in order to minimize I/O"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
default_locale: Union[str, None] = "en",
|
||||
locales_root: Union[str, Path] = Path("locale"),
|
||||
) -> None:
|
||||
if isinstance(locales_root, str):
|
||||
locales_root = Path(locales_root)
|
||||
elif not isinstance(locales_root, Path):
|
||||
raise TypeError("'locales_root' must be a valid path or path-like object")
|
||||
|
||||
files_locales: list = listdir(locales_root)
|
||||
|
||||
valid_locales: list = [
|
||||
".".join(entry.split(".")[:-1]) for entry in files_locales
|
||||
]
|
||||
|
||||
self.default: str = (
|
||||
sync.config_get("locale") if default_locale is None else default_locale
|
||||
)
|
||||
self.locales: dict = {}
|
||||
|
||||
for lc in valid_locales:
|
||||
self.locales[lc] = sync.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
|
||||
def _(self, key: str, *args: str, locale: Union[str, None] = None) -> Any:
|
||||
"""Get value of locale string
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path
|
||||
* *args (`list`): Path to key like: `dict[args][key]`
|
||||
* locale (`Union[str, None]`, *optional*): Locale to looked up in. Defaults to config's `"locale"` value
|
||||
|
||||
### Returns:
|
||||
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
|
||||
"""
|
||||
|
||||
if locale is None:
|
||||
locale = self.default
|
||||
|
||||
try:
|
||||
this_dict = self.locales[locale]
|
||||
except KeyError:
|
||||
try:
|
||||
this_dict = self.locales[self.default]
|
||||
except KeyError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
return this_key[key]
|
||||
except KeyError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
def in_all_locales(self, key: str, *args: str) -> list:
|
||||
"""Get value of the provided key and path in all available locales
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
|
||||
### Returns:
|
||||
* `list`: List of values in all locales
|
||||
"""
|
||||
|
||||
output = []
|
||||
|
||||
for name, lc in self.locales.items():
|
||||
try:
|
||||
this_dict = lc
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
output.append(this_key[key])
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return output
|
||||
|
||||
def in_every_locale(self, key: str, *args: str) -> Dict[str, Any]:
|
||||
"""Get value of the provided key and path in every available locale with locale tag
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
|
||||
### Returns:
|
||||
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
|
||||
"""
|
||||
|
||||
output = {}
|
||||
|
||||
for name, lc in self.locales.items():
|
||||
try:
|
||||
this_dict = lc
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
output[name] = this_key[key]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return output
|
117
src/libbot/i18n/sync/__init__.py
Normal file
117
src/libbot/i18n/sync/__init__.py
Normal file
@@ -0,0 +1,117 @@
|
||||
from os import listdir
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
import libbot
|
||||
|
||||
|
||||
def _(
|
||||
key: str,
|
||||
*args: str,
|
||||
locale: Union[str, None] = "en",
|
||||
locales_root: Union[str, Path] = Path("locale"),
|
||||
) -> Any:
|
||||
"""Get value of locale string
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locale (`Union[str, None]`): Locale to looked up in. Defaults to `"en"`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
|
||||
"""
|
||||
if locale is None:
|
||||
locale = libbot.sync.config_get("locale")
|
||||
|
||||
try:
|
||||
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{locale}.json"))
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
this_dict = libbot.sync.json_read(
|
||||
Path(f'{locales_root}/{libbot.sync.config_get("locale")}.json')
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
return this_key[key]
|
||||
except KeyError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
|
||||
def in_all_locales(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> list:
|
||||
"""Get value of the provided key and path in all available locales
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `list`: List of values in all locales
|
||||
"""
|
||||
|
||||
output = []
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
output.append(this_key[key])
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return output
|
||||
|
||||
|
||||
def in_every_locale(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> Dict[str, Any]:
|
||||
"""Get value of the provided key and path in every available locale with locale tag
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
|
||||
"""
|
||||
|
||||
output = {}
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
try:
|
||||
output[lc] = this_key[key]
|
||||
except KeyError:
|
||||
continue
|
||||
|
||||
return output
|
3
src/libbot/pyrogram/classes/__init__.py
Normal file
3
src/libbot/pyrogram/classes/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .client import PyroClient
|
||||
from .command import PyroCommand
|
||||
from .commandset import CommandSet
|
341
src/libbot/pyrogram/classes/client.py
Normal file
341
src/libbot/pyrogram/classes/client.py
Normal file
@@ -0,0 +1,341 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from os import cpu_count, getpid
|
||||
from pathlib import Path
|
||||
from time import time
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
try:
|
||||
import pyrogram
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.errors import BadRequest
|
||||
from pyrogram.handlers.message_handler import MessageHandler
|
||||
from pyrogram.raw.all import layer
|
||||
from pyrogram.types import (
|
||||
BotCommand,
|
||||
BotCommandScopeAllChatAdministrators,
|
||||
BotCommandScopeAllGroupChats,
|
||||
BotCommandScopeAllPrivateChats,
|
||||
BotCommandScopeChat,
|
||||
BotCommandScopeChatAdministrators,
|
||||
BotCommandScopeChatMember,
|
||||
BotCommandScopeDefault,
|
||||
)
|
||||
except ImportError as exc:
|
||||
raise ImportError(
|
||||
"You need to install libbot[pyrogram] in order to use this class."
|
||||
) from exc
|
||||
|
||||
try:
|
||||
from ujson import dumps, loads
|
||||
except ImportError:
|
||||
from json import dumps, loads
|
||||
|
||||
from libbot.i18n import BotLocale
|
||||
from libbot.i18n.sync import _
|
||||
from libbot.pyrogram.classes.command import PyroCommand
|
||||
from libbot.pyrogram.classes.commandset import CommandSet
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PyroClient(Client):
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "bot_client",
|
||||
config: Union[Dict[str, Any], None] = None,
|
||||
config_path: Union[str, Path] = Path("config.json"),
|
||||
api_id: Union[int, None] = None,
|
||||
api_hash: Union[str, None] = None,
|
||||
bot_token: Union[str, None] = None,
|
||||
workers: int = min(32, cpu_count() + 4),
|
||||
locales_root: Union[str, Path, None] = None,
|
||||
plugins_root: str = "plugins",
|
||||
plugins_exclude: Union[List[str], None] = None,
|
||||
sleep_threshold: int = 120,
|
||||
max_concurrent_transmissions: int = 1,
|
||||
commands_source: Union[Dict[str, dict], None] = None,
|
||||
scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = None,
|
||||
**kwargs,
|
||||
):
|
||||
if config is None:
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
self.config: dict = loads(f.read())
|
||||
else:
|
||||
self.config = config
|
||||
|
||||
super().__init__(
|
||||
name=name,
|
||||
api_id=self.config["bot"]["api_id"] if api_id is None else api_id,
|
||||
api_hash=self.config["bot"]["api_hash"] if api_hash is None else api_hash,
|
||||
bot_token=self.config["bot"]["bot_token"]
|
||||
if bot_token is None
|
||||
else bot_token,
|
||||
# Workers should be `min(32, cpu_count() + 4)`, otherwise
|
||||
# handlers land in another event loop and you won't see them
|
||||
workers=self.config["bot"]["workers"]
|
||||
if "workers" in self.config["bot"]
|
||||
else workers,
|
||||
plugins=dict(
|
||||
root=plugins_root,
|
||||
exclude=self.config["disabled_plugins"]
|
||||
if plugins_exclude is None
|
||||
else plugins_exclude,
|
||||
),
|
||||
sleep_threshold=sleep_threshold,
|
||||
max_concurrent_transmissions=self.config["bot"][
|
||||
"max_concurrent_transmissions"
|
||||
]
|
||||
if "max_concurrent_transmissions" in self.config["bot"]
|
||||
else max_concurrent_transmissions,
|
||||
**kwargs,
|
||||
)
|
||||
self.owner: int = self.config["bot"]["owner"]
|
||||
self.commands: List[PyroCommand] = []
|
||||
self.commands_source: Dict[str, dict] = (
|
||||
self.config["commands"] if commands_source is None else commands_source
|
||||
)
|
||||
self.scoped_commands: bool = self.config["bot"]["scoped_commands"]
|
||||
self.start_time: float = 0
|
||||
|
||||
self.bot_locale: BotLocale = BotLocale(
|
||||
(Path("locale") if locales_root is None else locales_root)
|
||||
)
|
||||
self.default_locale: str = self.bot_locale.default
|
||||
self.locales: dict = self.bot_locale.locales
|
||||
|
||||
self._ = self.bot_locale._
|
||||
self.in_all_locales = self.bot_locale.in_all_locales
|
||||
self.in_every_locale = self.bot_locale.in_every_locale
|
||||
|
||||
self.scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = scheduler
|
||||
|
||||
self.scopes_placeholders: Dict[str, int] = {"owner": self.owner}
|
||||
|
||||
async def start(self, register_commands: bool = True) -> None:
|
||||
await super().start()
|
||||
|
||||
self.start_time = time()
|
||||
|
||||
logger.info(
|
||||
"Bot is running with Pyrogram v%s (Layer %s) and has started as @%s on PID %s.",
|
||||
pyrogram.__version__,
|
||||
layer,
|
||||
self.me.username,
|
||||
getpid(),
|
||||
)
|
||||
|
||||
try:
|
||||
await self.send_message(
|
||||
chat_id=self.owner
|
||||
if self.config["reports"]["chat_id"] == "owner"
|
||||
else self.config["reports"]["chat_id"],
|
||||
text=f"Bot started PID `{getpid()}`",
|
||||
)
|
||||
|
||||
if self.scheduler is None:
|
||||
return
|
||||
|
||||
if register_commands:
|
||||
self.scheduler.add_job(
|
||||
self.register_commands,
|
||||
trigger="date",
|
||||
run_date=datetime.now() + timedelta(seconds=5),
|
||||
kwargs={"command_sets": await self.collect_commands()},
|
||||
)
|
||||
|
||||
self.scheduler.start()
|
||||
except BadRequest:
|
||||
logger.warning("Unable to send message to report chat.")
|
||||
|
||||
async def stop(self, exit_completely: bool = True) -> None:
|
||||
try:
|
||||
await self.send_message(
|
||||
chat_id=self.owner
|
||||
if self.config["reports"]["chat_id"] == "owner"
|
||||
else self.config["reports"]["chat_id"],
|
||||
text=f"Bot stopped with PID `{getpid()}`",
|
||||
)
|
||||
await asyncio.sleep(0.5)
|
||||
except BadRequest:
|
||||
logger.warning("Unable to send message to report chat.")
|
||||
|
||||
await super().stop()
|
||||
logger.warning("Bot stopped with PID %s.", getpid())
|
||||
|
||||
if exit_completely:
|
||||
try:
|
||||
exit()
|
||||
except SystemExit as exp:
|
||||
raise SystemExit(
|
||||
"Bot has been shut down, this is not an application error!"
|
||||
) from exp
|
||||
|
||||
async def collect_commands(self) -> Union[List[CommandSet], None]:
|
||||
"""Gather list of the bot's commands
|
||||
|
||||
### Returns:
|
||||
* `List[CommandSet]`: List of the commands' sets
|
||||
"""
|
||||
command_sets = None
|
||||
|
||||
# If config's bot.scoped_commands is true - more complicated
|
||||
# scopes system will be used instead of simple global commands
|
||||
if self.scoped_commands:
|
||||
scopes = {}
|
||||
command_sets = []
|
||||
|
||||
# Iterate through all commands in config
|
||||
for command, contents in self.commands_source.items():
|
||||
# Iterate through all scopes of a command
|
||||
for scope in contents["scopes"]:
|
||||
if dumps(scope) not in scopes:
|
||||
scopes[dumps(scope)] = {"_": []}
|
||||
|
||||
# Add command to the scope's flattened key in scopes dict
|
||||
scopes[dumps(scope)]["_"].append(
|
||||
BotCommand(command, _(command, "commands"))
|
||||
)
|
||||
|
||||
for locale, string in (
|
||||
self.in_every_locale(command, "commands")
|
||||
).items():
|
||||
if locale not in scopes[dumps(scope)]:
|
||||
scopes[dumps(scope)][locale] = []
|
||||
|
||||
scopes[dumps(scope)][locale].append(BotCommand(command, string))
|
||||
|
||||
# Iterate through all scopes and its commands
|
||||
for scope, locales in scopes.items():
|
||||
# Make flat key a dict again
|
||||
scope_dict = loads(scope)
|
||||
|
||||
# Replace "owner" in the bot scope with owner's id
|
||||
for placeholder, chat_id in self.scopes_placeholders.items():
|
||||
if "chat_id" in scope_dict and scope_dict["chat_id"] == placeholder:
|
||||
scope_dict["chat_id"] = chat_id
|
||||
|
||||
# Create object with the same name and args from the dict
|
||||
try:
|
||||
scope_obj = globals()[scope_dict["name"]](
|
||||
**{
|
||||
key: value
|
||||
for key, value in scope_dict.items()
|
||||
if key != "name"
|
||||
}
|
||||
)
|
||||
except NameError:
|
||||
logger.error(
|
||||
"Could not register commands of the scope '%s' due to an invalid scope class provided!",
|
||||
scope_dict["name"],
|
||||
)
|
||||
continue
|
||||
except TypeError:
|
||||
logger.error(
|
||||
"Could not register commands of the scope '%s' due to an invalid class arguments provided!",
|
||||
scope_dict["name"],
|
||||
)
|
||||
continue
|
||||
|
||||
# Add set of commands to the list of the command sets
|
||||
for locale, commands in locales.items():
|
||||
if locale == "_":
|
||||
command_sets.append(
|
||||
CommandSet(commands, scope=scope_obj, language_code="")
|
||||
)
|
||||
continue
|
||||
command_sets.append(
|
||||
CommandSet(commands, scope=scope_obj, language_code=locale)
|
||||
)
|
||||
|
||||
logger.info("Registering the following command sets: %s", command_sets)
|
||||
|
||||
else:
|
||||
# This part here looks into the handlers and looks for commands
|
||||
# in it, if there are any. Then adds them to self.commands
|
||||
for handler in self.dispatcher.groups[0]:
|
||||
if isinstance(handler, MessageHandler):
|
||||
for entry in [handler.filters.base, handler.filters.other]:
|
||||
if hasattr(entry, "commands"):
|
||||
for command in entry.commands:
|
||||
logger.info("I see a command %s in my filters", command)
|
||||
self.add_command(command)
|
||||
|
||||
return command_sets
|
||||
|
||||
def add_command(
|
||||
self,
|
||||
command: str,
|
||||
) -> None:
|
||||
"""Add command to the bot's internal commands list
|
||||
|
||||
### Args:
|
||||
* command (`str`)
|
||||
"""
|
||||
self.commands.append(
|
||||
PyroCommand(
|
||||
command,
|
||||
_(command, "commands"),
|
||||
)
|
||||
)
|
||||
logger.info(
|
||||
"Added command '%s' to the bot's internal commands list",
|
||||
command,
|
||||
)
|
||||
|
||||
async def register_commands(
|
||||
self, command_sets: Union[List[CommandSet], None] = None
|
||||
) -> None:
|
||||
"""Register commands stored in bot's 'commands' attribute"""
|
||||
|
||||
if command_sets is None:
|
||||
commands = [
|
||||
BotCommand(command=command.command, description=command.description)
|
||||
for command in self.commands
|
||||
]
|
||||
|
||||
logger.info(
|
||||
"Registering commands %s with a default scope 'BotCommandScopeDefault'"
|
||||
)
|
||||
|
||||
await self.set_bot_commands(commands)
|
||||
return
|
||||
|
||||
for command_set in command_sets:
|
||||
logger.info(
|
||||
"Registering command set with commands %s and scope '%s' (%s)",
|
||||
command_set.commands,
|
||||
command_set.scope,
|
||||
command_set.language_code,
|
||||
)
|
||||
await self.set_bot_commands(
|
||||
command_set.commands,
|
||||
command_set.scope,
|
||||
language_code=command_set.language_code,
|
||||
)
|
||||
|
||||
async def remove_commands(
|
||||
self, command_sets: Union[List[CommandSet], None] = None
|
||||
) -> None:
|
||||
"""Remove commands stored in bot's 'commands' attribute"""
|
||||
|
||||
if command_sets is None:
|
||||
logger.info(
|
||||
"Removing commands with a default scope 'BotCommandScopeDefault'"
|
||||
)
|
||||
await self.delete_bot_commands(BotCommandScopeDefault())
|
||||
return
|
||||
|
||||
for command_set in command_sets:
|
||||
logger.info(
|
||||
"Removing command set with scope '%s' (%s)",
|
||||
command_set.scope,
|
||||
command_set.language_code,
|
||||
)
|
||||
await self.delete_bot_commands(
|
||||
command_set.scope,
|
||||
language_code=command_set.language_code,
|
||||
)
|
9
src/libbot/pyrogram/classes/command.py
Normal file
9
src/libbot/pyrogram/classes/command.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class PyroCommand:
|
||||
"""Command stored in PyroClient's 'commands' attribute"""
|
||||
|
||||
command: str
|
||||
description: str
|
35
src/libbot/pyrogram/classes/commandset.py
Normal file
35
src/libbot/pyrogram/classes/commandset.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Union
|
||||
|
||||
try:
|
||||
from pyrogram.types import (
|
||||
BotCommand,
|
||||
BotCommandScopeAllChatAdministrators,
|
||||
BotCommandScopeAllGroupChats,
|
||||
BotCommandScopeAllPrivateChats,
|
||||
BotCommandScopeChat,
|
||||
BotCommandScopeChatAdministrators,
|
||||
BotCommandScopeChatMember,
|
||||
BotCommandScopeDefault,
|
||||
)
|
||||
except ImportError as exc:
|
||||
raise ImportError(
|
||||
"You need to install libbot[pyrogram] in order to use this class."
|
||||
) from exc
|
||||
|
||||
|
||||
@dataclass
|
||||
class CommandSet:
|
||||
"""Command stored in PyroClient's 'commands' attribute"""
|
||||
|
||||
commands: List[BotCommand]
|
||||
scope: Union[
|
||||
BotCommandScopeDefault,
|
||||
BotCommandScopeAllPrivateChats,
|
||||
BotCommandScopeAllGroupChats,
|
||||
BotCommandScopeAllChatAdministrators,
|
||||
BotCommandScopeChat,
|
||||
BotCommandScopeChatAdministrators,
|
||||
BotCommandScopeChatMember,
|
||||
] = BotCommandScopeDefault
|
||||
language_code: str = ""
|
116
src/libbot/sync/__init__.py
Normal file
116
src/libbot/sync/__init__.py
Normal file
@@ -0,0 +1,116 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
|
||||
try:
|
||||
from ujson import dumps, loads
|
||||
except ImportError:
|
||||
from json import dumps, loads
|
||||
|
||||
|
||||
def json_read(path: Union[str, Path]) -> Any:
|
||||
"""Read contents of a JSON file
|
||||
|
||||
### Args:
|
||||
* path (`Union[str, Path]`): Path-like object or path as a string
|
||||
|
||||
### Returns:
|
||||
* `Any`: File contents
|
||||
"""
|
||||
with open(str(path), mode="r", encoding="utf-8") as f:
|
||||
data = f.read()
|
||||
return loads(data)
|
||||
|
||||
|
||||
def json_write(data: Any, path: Union[str, Path]) -> None:
|
||||
"""Write contents to a JSON file
|
||||
|
||||
### Args:
|
||||
* data (`Any`): Contents to write. Must be a JSON serializable
|
||||
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
|
||||
"""
|
||||
with open(str(path), mode="w", encoding="utf-8") as f:
|
||||
f.write(
|
||||
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
|
||||
if hasattr(dumps, "escape_forward_slashes")
|
||||
else dumps(data, ensure_ascii=False, indent=4)
|
||||
)
|
||||
|
||||
|
||||
def nested_set(target: dict, value: Any, *path: str, create_missing=True) -> dict:
|
||||
"""Set the key by its path to the value
|
||||
|
||||
### Args:
|
||||
* target (`dict`): Dictionary to perform modifications on
|
||||
* value (`Any`): Any data
|
||||
* *path (`str`): Path to the key of the target
|
||||
* create_missing (`bool`, *optional*): Create keys on the way if they're missing. Defaults to `True`
|
||||
|
||||
### Returns:
|
||||
* `dict`: Changed dictionary
|
||||
"""
|
||||
d = target
|
||||
for key in path[:-1]:
|
||||
if key in d:
|
||||
d = d[key]
|
||||
elif create_missing:
|
||||
d = d.setdefault(key, {})
|
||||
else:
|
||||
raise KeyError(
|
||||
f"Key '{key}' is not found under path provided ({path}) and create_missing is False"
|
||||
)
|
||||
if path[-1] in d or create_missing:
|
||||
d[path[-1]] = value
|
||||
return target
|
||||
|
||||
|
||||
def config_get(
|
||||
key: str, *path: str, config_file: Union[str, Path] = "config.json"
|
||||
) -> Any:
|
||||
"""Get a value of the config key by its path provided
|
||||
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
|
||||
|
||||
### Args:
|
||||
* key (`str`): Key that contains the value
|
||||
* *path (`str`): Path to the key that contains the value
|
||||
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
|
||||
|
||||
### Returns:
|
||||
* `Any`: Key's value
|
||||
|
||||
### Example:
|
||||
Get the "salary" of "Pete" from this JSON structure:
|
||||
```json
|
||||
{
|
||||
"users": {
|
||||
"Pete": {
|
||||
"salary": 10.0
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This can be easily done with the following code:
|
||||
```python
|
||||
import libbot
|
||||
salary = libbot.sync.config_get("salary", "users", "Pete")
|
||||
```
|
||||
"""
|
||||
this_key = json_read(config_file)
|
||||
for dict_key in path:
|
||||
this_key = this_key[dict_key]
|
||||
return this_key[key]
|
||||
|
||||
|
||||
def config_set(
|
||||
key: str, value: Any, *path: str, config_file: Union[str, Path] = "config.json"
|
||||
) -> None:
|
||||
"""Set config's key by its path to the value
|
||||
|
||||
### Args:
|
||||
* key (`str`): Key that leads to the value
|
||||
* value (`Any`): Any JSON serializable data
|
||||
* *path (`str`): Path to the key of the target
|
||||
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
|
||||
"""
|
||||
json_write(nested_set(json_read(config_file), value, *(*path, key)), config_file)
|
||||
return
|
Reference in New Issue
Block a user