Compare commits
29 Commits
Author | SHA1 | Date | |
---|---|---|---|
461642a529
|
|||
e5c0f5c1d1
|
|||
00b1058014
|
|||
bc5be37ff1
|
|||
7c756d7065
|
|||
3273b86b75
|
|||
783443e448
|
|||
723cc40221
|
|||
b6537a50ae
|
|||
e508f37089
|
|||
d66bb9c93e
|
|||
253c85985b
|
|||
11d49fd476
|
|||
dc107ebdb3
|
|||
33c33d08e2
|
|||
295e77e403
|
|||
279a8e9d84
|
|||
ae374511cd
|
|||
eb23d3e9b6
|
|||
f4e74b5bc6
|
|||
7b8434ae71
|
|||
8c2054f496
|
|||
fe9cc3674f
|
|||
c71a7555f9
|
|||
cb755faa9a
|
|||
1859d0532c
|
|||
ebce8e0141
|
|||
9af6d5cb7c
|
|||
7f054a6d93
|
8
.gitignore
vendored
8
.gitignore
vendored
@@ -161,8 +161,8 @@ cython_debug/
|
||||
#.idea/
|
||||
|
||||
# Project
|
||||
venv
|
||||
venv_linux
|
||||
venv_windows
|
||||
venv/
|
||||
venv_linux/
|
||||
venv_windows/
|
||||
|
||||
.vscode
|
||||
.vscode/
|
@@ -1,9 +1,6 @@
|
||||
__name__ = "libbot"
|
||||
__version__ = "1.2"
|
||||
__version__ = "0.2.0"
|
||||
__license__ = "GPL3"
|
||||
__author__ = "Profitroll"
|
||||
|
||||
from . import i18n, pyrogram, sync
|
||||
from .__main__ import *
|
||||
from . import sync
|
||||
from . import i18n
|
||||
from . import pyrogram
|
||||
|
@@ -35,6 +35,8 @@ async def json_write(data: Any, path: Union[str, Path]) -> None:
|
||||
async with aiofiles.open(str(path), mode="w", encoding="utf-8") as f:
|
||||
await f.write(
|
||||
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
|
||||
if hasattr(dumps, "escape_forward_slashes")
|
||||
else dumps(data, ensure_ascii=False, indent=4)
|
||||
)
|
||||
|
||||
|
||||
|
@@ -1,35 +1,37 @@
|
||||
from os import listdir
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from libbot import config_get, json_read, sync
|
||||
import libbot
|
||||
from libbot.i18n import sync
|
||||
from libbot.i18n.classes.bot_locale import BotLocale
|
||||
|
||||
|
||||
async def _(key: str, *args: str, locale: str = sync.config_get("locale")) -> Any:
|
||||
async def _(
|
||||
key: str,
|
||||
*args: str,
|
||||
locale: Union[str, None] = "en",
|
||||
locales_root: Union[str, Path] = Path("locale"),
|
||||
) -> Any:
|
||||
"""Get value of locale string
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path
|
||||
* *args (`list`): Path to key like: `dict[args][key]`
|
||||
* locale (`str`): Locale to looked up in. Defaults to config's `"locale"` value
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locale (`Union[str, None]`): Locale to looked up in. Defaults to `"en"`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
|
||||
"""
|
||||
if locale is None:
|
||||
locale = sync.config_get("locale")
|
||||
locale = libbot.sync.config_get("locale") if locale is None else locale
|
||||
|
||||
try:
|
||||
this_dict = await json_read(
|
||||
Path(f'{await config_get("locale", "locations")}/{locale}.json')
|
||||
)
|
||||
this_dict = await libbot.json_read(Path(f"{locales_root}/{locale}.json"))
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
this_dict = await json_read(
|
||||
Path(
|
||||
f'{await config_get("locale", "locations")}/{await config_get("locale")}.json'
|
||||
)
|
||||
this_dict = await libbot.json_read(
|
||||
Path(f'{locales_root}/{await libbot.config_get("locale")}.json')
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
@@ -44,26 +46,27 @@ async def _(key: str, *args: str, locale: str = sync.config_get("locale")) -> An
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
|
||||
async def in_all_locales(key: str, *args: str) -> list:
|
||||
async def in_all_locales(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> list:
|
||||
"""Get value of the provided key and path in all available locales
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `list`: List of values in all locales
|
||||
"""
|
||||
|
||||
output = []
|
||||
files_locales = listdir(await config_get("locale", "locations"))
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = await json_read(
|
||||
Path(f'{await config_get("locale", "locations")}/{lc}.json')
|
||||
)
|
||||
this_dict = await libbot.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
@@ -79,26 +82,27 @@ async def in_all_locales(key: str, *args: str) -> list:
|
||||
return output
|
||||
|
||||
|
||||
async def in_every_locale(key: str, *args: str) -> Dict[str, Any]:
|
||||
async def in_every_locale(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> Dict[str, Any]:
|
||||
"""Get value of the provided key and path in every available locale with locale tag
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
|
||||
"""
|
||||
|
||||
output = {}
|
||||
files_locales = listdir(await config_get("locale", "locations"))
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = await json_read(
|
||||
Path(f'{await config_get("locale", "locations")}/{lc}.json')
|
||||
)
|
||||
this_dict = await libbot.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
|
@@ -8,25 +8,29 @@ from libbot import sync
|
||||
class BotLocale:
|
||||
"""Small addon that can be used by bot clients' classes in order to minimize I/O"""
|
||||
|
||||
def __init__(self, locales_folder: Union[str, Path, None] = None) -> None:
|
||||
if locales_folder is None:
|
||||
locales_folder = Path(sync.config_get("locale", "locations"))
|
||||
elif isinstance(locales_folder, str):
|
||||
locales_folder = Path(locales_folder)
|
||||
elif not isinstance(locales_folder, Path):
|
||||
raise TypeError("'locales_folder' must be a valid path or path-like object")
|
||||
def __init__(
|
||||
self,
|
||||
default_locale: Union[str, None] = "en",
|
||||
locales_root: Union[str, Path] = Path("locale"),
|
||||
) -> None:
|
||||
if isinstance(locales_root, str):
|
||||
locales_root = Path(locales_root)
|
||||
elif not isinstance(locales_root, Path):
|
||||
raise TypeError("'locales_root' must be a valid path or path-like object")
|
||||
|
||||
files_locales: list = listdir(locales_folder)
|
||||
files_locales: list = listdir(locales_root)
|
||||
|
||||
valid_locales: list = [
|
||||
".".join(entry.split(".")[:-1]) for entry in files_locales
|
||||
]
|
||||
|
||||
self.default: str = sync.config_get("locale")
|
||||
self.default: str = (
|
||||
sync.config_get("locale") if default_locale is None else default_locale
|
||||
)
|
||||
self.locales: dict = {}
|
||||
|
||||
for lc in valid_locales:
|
||||
self.locales[lc] = sync.json_read(Path(f"{locales_folder}/{lc}.json"))
|
||||
self.locales[lc] = sync.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
|
||||
def _(self, key: str, *args: str, locale: Union[str, None] = None) -> Any:
|
||||
"""Get value of locale string
|
||||
|
@@ -1,33 +1,36 @@
|
||||
from os import listdir
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from libbot import sync
|
||||
from libbot.sync import config_get, json_read
|
||||
import libbot
|
||||
|
||||
|
||||
def _(key: str, *args: str, locale: str = sync.config_get("locale")) -> Any:
|
||||
def _(
|
||||
key: str,
|
||||
*args: str,
|
||||
locale: Union[str, None] = "en",
|
||||
locales_root: Union[str, Path] = Path("locale"),
|
||||
) -> Any:
|
||||
"""Get value of locale string
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path
|
||||
* *args (`list`): Path to key like: `dict[args][key]`
|
||||
* locale (`str`): Locale to looked up in. Defaults to config's `"locale"` value
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locale (`Union[str, None]`): Locale to looked up in. Defaults to `"en"`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
|
||||
"""
|
||||
if locale is None:
|
||||
locale = sync.config_get("locale")
|
||||
locale = libbot.sync.config_get("locale")
|
||||
|
||||
try:
|
||||
this_dict = json_read(
|
||||
Path(f'{config_get("locale", "locations")}/{locale}.json')
|
||||
)
|
||||
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{locale}.json"))
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
this_dict = json_read(
|
||||
Path(f'{config_get("locale", "locations")}/{config_get("locale")}.json')
|
||||
this_dict = libbot.sync.json_read(
|
||||
Path(f'{locales_root}/{libbot.sync.config_get("locale")}.json')
|
||||
)
|
||||
except FileNotFoundError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
@@ -42,26 +45,27 @@ def _(key: str, *args: str, locale: str = sync.config_get("locale")) -> Any:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
|
||||
|
||||
|
||||
def in_all_locales(key: str, *args: str) -> list:
|
||||
def in_all_locales(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> list:
|
||||
"""Get value of the provided key and path in all available locales
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `list`: List of values in all locales
|
||||
"""
|
||||
|
||||
output = []
|
||||
files_locales = listdir(config_get("locale", "locations"))
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = json_read(
|
||||
Path(f'{config_get("locale", "locations")}/{lc}.json')
|
||||
)
|
||||
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
@@ -77,26 +81,27 @@ def in_all_locales(key: str, *args: str) -> list:
|
||||
return output
|
||||
|
||||
|
||||
def in_every_locale(key: str, *args: str) -> Dict[str, Any]:
|
||||
def in_every_locale(
|
||||
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
|
||||
) -> Dict[str, Any]:
|
||||
"""Get value of the provided key and path in every available locale with locale tag
|
||||
|
||||
### Args:
|
||||
* key (`str`): The last key of the locale's keys path.
|
||||
* *args (`list`): Path to key like: `dict[args][key]`.
|
||||
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
|
||||
|
||||
### Returns:
|
||||
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
|
||||
"""
|
||||
|
||||
output = {}
|
||||
files_locales = listdir(config_get("locale", "locations"))
|
||||
files_locales = listdir(locales_root)
|
||||
|
||||
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
|
||||
for lc in valid_locales:
|
||||
try:
|
||||
this_dict = json_read(
|
||||
Path(f'{config_get("locale", "locations")}/{lc}.json')
|
||||
)
|
||||
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{lc}.json"))
|
||||
except FileNotFoundError:
|
||||
continue
|
||||
|
||||
|
@@ -1,12 +1,15 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from os import getpid
|
||||
from os import cpu_count, getpid
|
||||
from pathlib import Path
|
||||
from time import time
|
||||
from typing import List, Union
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
try:
|
||||
import pyrogram
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.schedulers.background import BackgroundScheduler
|
||||
from pyrogram.client import Client
|
||||
from pyrogram.errors import BadRequest
|
||||
from pyrogram.handlers.message_handler import MessageHandler
|
||||
@@ -40,29 +43,67 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PyroClient(Client):
|
||||
def __init__(self):
|
||||
with open("config.json", "r", encoding="utf-8") as f:
|
||||
self.config: dict = loads(f.read())
|
||||
def __init__(
|
||||
self,
|
||||
name: str = "bot_client",
|
||||
config: Union[Dict[str, Any], None] = None,
|
||||
config_path: Union[str, Path] = Path("config.json"),
|
||||
api_id: Union[int, None] = None,
|
||||
api_hash: Union[str, None] = None,
|
||||
bot_token: Union[str, None] = None,
|
||||
workers: int = min(32, cpu_count() + 4),
|
||||
locales_root: Union[str, Path, None] = None,
|
||||
plugins_root: str = "plugins",
|
||||
plugins_exclude: Union[List[str], None] = None,
|
||||
sleep_threshold: int = 120,
|
||||
max_concurrent_transmissions: int = 1,
|
||||
commands_source: Union[Dict[str, dict], None] = None,
|
||||
scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = None,
|
||||
**kwargs,
|
||||
):
|
||||
if config is None:
|
||||
with open(config_path, "r", encoding="utf-8") as f:
|
||||
self.config: dict = loads(f.read())
|
||||
else:
|
||||
self.config = config
|
||||
|
||||
super().__init__(
|
||||
name="bot_client",
|
||||
api_id=self.config["bot"]["api_id"],
|
||||
api_hash=self.config["bot"]["api_hash"],
|
||||
bot_token=self.config["bot"]["bot_token"],
|
||||
# Workers should be commented when using convopyro, otherwise
|
||||
name=name,
|
||||
api_id=self.config["bot"]["api_id"] if api_id is None else api_id,
|
||||
api_hash=self.config["bot"]["api_hash"] if api_hash is None else api_hash,
|
||||
bot_token=self.config["bot"]["bot_token"]
|
||||
if bot_token is None
|
||||
else bot_token,
|
||||
# Workers should be `min(32, cpu_count() + 4)`, otherwise
|
||||
# handlers land in another event loop and you won't see them
|
||||
workers=self.config["bot"]["workers"],
|
||||
plugins=dict(root="plugins", exclude=self.config["disabled_plugins"]),
|
||||
sleep_threshold=120,
|
||||
workers=self.config["bot"]["workers"]
|
||||
if "workers" in self.config["bot"]
|
||||
else workers,
|
||||
plugins=dict(
|
||||
root=plugins_root,
|
||||
exclude=self.config["disabled_plugins"]
|
||||
if plugins_exclude is None
|
||||
else plugins_exclude,
|
||||
),
|
||||
sleep_threshold=sleep_threshold,
|
||||
max_concurrent_transmissions=self.config["bot"][
|
||||
"max_concurrent_transmissions"
|
||||
],
|
||||
]
|
||||
if "max_concurrent_transmissions" in self.config["bot"]
|
||||
else max_concurrent_transmissions,
|
||||
**kwargs,
|
||||
)
|
||||
self.owner: int = self.config["bot"]["owner"]
|
||||
self.commands: List[PyroCommand] = []
|
||||
self.commands_source: Dict[str, dict] = (
|
||||
self.config["commands"] if commands_source is None else commands_source
|
||||
)
|
||||
self.scoped_commands: bool = self.config["bot"]["scoped_commands"]
|
||||
self.start_time: float = 0
|
||||
|
||||
self.bot_locale: BotLocale = BotLocale(Path(self.config["locations"]["locale"]))
|
||||
self.bot_locale: BotLocale = BotLocale(
|
||||
(Path("locale") if locales_root is None else locales_root)
|
||||
)
|
||||
self.default_locale: str = self.bot_locale.default
|
||||
self.locales: dict = self.bot_locale.locales
|
||||
|
||||
@@ -70,7 +111,11 @@ class PyroClient(Client):
|
||||
self.in_all_locales = self.bot_locale.in_all_locales
|
||||
self.in_every_locale = self.bot_locale.in_every_locale
|
||||
|
||||
async def start(self, scheduler):
|
||||
self.scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = scheduler
|
||||
|
||||
self.scopes_placeholders: Dict[str, int] = {"owner": self.owner}
|
||||
|
||||
async def start(self, register_commands: bool = True):
|
||||
await super().start()
|
||||
|
||||
self.start_time = time()
|
||||
@@ -85,32 +130,50 @@ class PyroClient(Client):
|
||||
|
||||
try:
|
||||
await self.send_message(
|
||||
chat_id=self.config["reports"]["chat_id"],
|
||||
chat_id=self.owner
|
||||
if self.config["reports"]["chat_id"] == "owner"
|
||||
else self.config["reports"]["chat_id"],
|
||||
text=f"Bot started PID `{getpid()}`",
|
||||
)
|
||||
|
||||
scheduler.add_job(
|
||||
self.register_commands,
|
||||
trigger="date",
|
||||
run_date=datetime.now() + timedelta(seconds=5),
|
||||
kwargs={"command_sets": await self.collect_commands()},
|
||||
)
|
||||
if self.scheduler is None:
|
||||
return
|
||||
|
||||
scheduler.start()
|
||||
if register_commands:
|
||||
self.scheduler.add_job(
|
||||
self.register_commands,
|
||||
trigger="date",
|
||||
run_date=datetime.now() + timedelta(seconds=5),
|
||||
kwargs={"command_sets": await self.collect_commands()},
|
||||
)
|
||||
|
||||
self.scheduler.start()
|
||||
except BadRequest:
|
||||
logger.warning("Unable to send message to report chat.")
|
||||
|
||||
async def stop(self):
|
||||
async def stop(self, exit_completely: bool = True):
|
||||
try:
|
||||
await self.send_message(
|
||||
chat_id=self.config["reports"]["chat_id"],
|
||||
chat_id=self.owner
|
||||
if self.config["reports"]["chat_id"] == "owner"
|
||||
else self.config["reports"]["chat_id"],
|
||||
text=f"Bot stopped with PID `{getpid()}`",
|
||||
)
|
||||
await asyncio.sleep(0.5)
|
||||
except BadRequest:
|
||||
logger.warning("Unable to send message to report chat.")
|
||||
|
||||
await super().stop()
|
||||
logger.warning("Bot stopped with PID %s.", getpid())
|
||||
|
||||
if exit_completely:
|
||||
try:
|
||||
exit()
|
||||
except SystemExit as exp:
|
||||
raise SystemExit(
|
||||
"Bot has been shut down, this is not an application error!"
|
||||
) from exp
|
||||
|
||||
async def collect_commands(self) -> Union[List[CommandSet], None]:
|
||||
"""Gather list of the bot's commands
|
||||
|
||||
@@ -126,7 +189,7 @@ class PyroClient(Client):
|
||||
command_sets = []
|
||||
|
||||
# Iterate through all commands in config
|
||||
for command, contents in self.config["commands"].items():
|
||||
for command, contents in self.commands_source.items():
|
||||
# Iterate through all scopes of a command
|
||||
for scope in contents["scopes"]:
|
||||
if dumps(scope) not in scopes:
|
||||
@@ -151,8 +214,9 @@ class PyroClient(Client):
|
||||
scope_dict = loads(scope)
|
||||
|
||||
# Replace "owner" in the bot scope with owner's id
|
||||
if "chat_id" in scope_dict and scope_dict["chat_id"] == "owner":
|
||||
scope_dict["chat_id"] = self.owner
|
||||
for placeholder, chat_id in self.scopes_placeholders.items():
|
||||
if "chat_id" in scope_dict and scope_dict["chat_id"] == placeholder:
|
||||
scope_dict["chat_id"] = chat_id
|
||||
|
||||
# Create object with the same name and args from the dict
|
||||
try:
|
||||
|
@@ -29,7 +29,11 @@ def json_write(data: Any, path: Union[str, Path]) -> None:
|
||||
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
|
||||
"""
|
||||
with open(str(path), mode="w", encoding="utf-8") as f:
|
||||
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
|
||||
f.write(
|
||||
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
|
||||
if hasattr(dumps, "escape_forward_slashes")
|
||||
else dumps(data, ensure_ascii=False, indent=4)
|
||||
)
|
||||
|
||||
|
||||
def nested_set(target: dict, value: Any, *path: str, create_missing=True) -> dict:
|
||||
@@ -51,7 +55,9 @@ def nested_set(target: dict, value: Any, *path: str, create_missing=True) -> dic
|
||||
elif create_missing:
|
||||
d = d.setdefault(key, {})
|
||||
else:
|
||||
return target
|
||||
raise KeyError(
|
||||
f"Key '{key}' is not found under path provided ({path}) and create_missing is False"
|
||||
)
|
||||
if path[-1] in d or create_missing:
|
||||
d[path[-1]] = value
|
||||
return target
|
||||
|
@@ -4,12 +4,12 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "libbot"
|
||||
version = "1.2"
|
||||
version = "0.2.0"
|
||||
authors = [{ name = "Profitroll" }]
|
||||
description = "Universal bot library with functions needed for basic Discord/Telegram bot development."
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.8"
|
||||
license = { text = "GPL3" }
|
||||
license = { file = "LICENSE" }
|
||||
classifiers = [
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Intended Audience :: Developers",
|
||||
@@ -25,8 +25,8 @@ classifiers = [
|
||||
dependencies = ["aiofiles~=23.1.0"]
|
||||
|
||||
[project.optional-dependencies]
|
||||
pycord = ["py-cord>=2.0.0"]
|
||||
pyrogram = ["pyrogram>=2.0.0"]
|
||||
pycord = ["py-cord~=2.4.1"]
|
||||
pyrogram = ["pyrogram~=2.0.106", "apscheduler~=3.10.1"]
|
||||
speed = ["ujson~=5.8.0"]
|
||||
|
||||
[project.urls]
|
||||
@@ -48,7 +48,13 @@ packages = [
|
||||
[tool.setuptools_scm]
|
||||
|
||||
[tool.black]
|
||||
target-version = ['py38', 'py38', 'py39', 'py310', 'py311']
|
||||
target-version = ['py38', 'py39', 'py310', 'py311']
|
||||
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
minversion = "6.0"
|
||||
python_files = ["test_*.py"]
|
||||
pythonpath = "."
|
||||
testpaths = ["tests"]
|
||||
|
4
setup.py
4
setup.py
@@ -1,4 +0,0 @@
|
||||
from setuptools import setup
|
||||
|
||||
if __name__ == "__main__":
|
||||
setup()
|
6
tests/config.json
Normal file
6
tests/config.json
Normal file
@@ -0,0 +1,6 @@
|
||||
{
|
||||
"locale": "en",
|
||||
"bot": {
|
||||
"bot_token": "sample_token"
|
||||
}
|
||||
}
|
1
tests/data/empty.json
Normal file
1
tests/data/empty.json
Normal file
@@ -0,0 +1 @@
|
||||
{}
|
3
tests/data/invalid.json
Normal file
3
tests/data/invalid.json
Normal file
@@ -0,0 +1,3 @@
|
||||
{
|
||||
"foo": 'bar'
|
||||
}
|
11
tests/data/locale/en.json
Normal file
11
tests/data/locale/en.json
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"foo": "bar",
|
||||
"messages": {
|
||||
"example": "okay"
|
||||
},
|
||||
"callbacks": {
|
||||
"default": {
|
||||
"nested": "sure"
|
||||
}
|
||||
}
|
||||
}
|
11
tests/data/locale/uk.json
Normal file
11
tests/data/locale/uk.json
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"foo": "бар",
|
||||
"messages": {
|
||||
"example": "окей"
|
||||
},
|
||||
"callbacks": {
|
||||
"default": {
|
||||
"nested": "авжеж"
|
||||
}
|
||||
}
|
||||
}
|
15
tests/data/test.json
Normal file
15
tests/data/test.json
Normal file
@@ -0,0 +1,15 @@
|
||||
{
|
||||
"foo": "bar",
|
||||
"abcdefg": [
|
||||
"higklmnop",
|
||||
{
|
||||
"lol": {
|
||||
"kek": [
|
||||
1.0000035,
|
||||
0.2542,
|
||||
1337
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
53
tests/test_bot_locale.py
Normal file
53
tests/test_bot_locale.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot.i18n import BotLocale
|
||||
|
||||
bot_locale = BotLocale(Path("tests/data/locale"))
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, locale, expected",
|
||||
[
|
||||
("foo", [], None, "bar"),
|
||||
("foo", [], "uk", "бар"),
|
||||
("example", ["messages"], None, "okay"),
|
||||
("example", ["messages"], "uk", "окей"),
|
||||
("nested", ["callbacks", "default"], None, "sure"),
|
||||
("nested", ["callbacks", "default"], "uk", "авжеж"),
|
||||
],
|
||||
)
|
||||
def test_bot_locale_get(
|
||||
key: str, args: List[str], locale: Union[str, None], expected: Any
|
||||
):
|
||||
assert (
|
||||
bot_locale._(key, *args, locale=locale)
|
||||
if locale is not None
|
||||
else bot_locale._(key, *args)
|
||||
) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, expected",
|
||||
[
|
||||
("foo", [], ["bar", "бар"]),
|
||||
("example", ["messages"], ["okay", "окей"]),
|
||||
("nested", ["callbacks", "default"], ["sure", "авжеж"]),
|
||||
],
|
||||
)
|
||||
def test_i18n_in_all_locales(key: str, args: List[str], expected: Any):
|
||||
assert (bot_locale.in_all_locales(key, *args)) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, expected",
|
||||
[
|
||||
("foo", [], {"en": "bar", "uk": "бар"}),
|
||||
("example", ["messages"], {"en": "okay", "uk": "окей"}),
|
||||
("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}),
|
||||
],
|
||||
)
|
||||
def test_i18n_in_every_locale(key: str, args: List[str], expected: Any):
|
||||
assert (bot_locale.in_every_locale(key, *args)) == expected
|
49
tests/test_config_async.py
Normal file
49
tests/test_config_async.py
Normal file
@@ -0,0 +1,49 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, List
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot import config_get, config_set
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"args, expected",
|
||||
[
|
||||
(["locale"], "en"),
|
||||
(["bot_token", "bot"], "sample_token"),
|
||||
],
|
||||
)
|
||||
async def test_config_get_valid(args: List[str], expected: str):
|
||||
assert (
|
||||
await config_get(args[0], *args[1:], config_file=Path("tests/config.json"))
|
||||
== expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"args, expected",
|
||||
[
|
||||
(["bot_stonks", "bot"], pytest.raises(KeyError)),
|
||||
],
|
||||
)
|
||||
async def test_config_get_invalid(args: List[str], expected: Any):
|
||||
with expected:
|
||||
assert (
|
||||
await config_get(args[0], *args[1:], config_file=Path("tests/config.json"))
|
||||
== expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"key, path, value",
|
||||
[
|
||||
("locale", [], "en"),
|
||||
("bot_token", ["bot"], "sample_token"),
|
||||
],
|
||||
)
|
||||
async def test_config_set(key: str, path: List[str], value: Any):
|
||||
await config_set(key, value, *path, config_file=Path("tests/config.json"))
|
||||
assert await config_get(key, *path, config_file=Path("tests/config.json")) == value
|
46
tests/test_config_sync.py
Normal file
46
tests/test_config_sync.py
Normal file
@@ -0,0 +1,46 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, List
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot import sync
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"args, expected",
|
||||
[
|
||||
(["locale"], "en"),
|
||||
(["bot_token", "bot"], "sample_token"),
|
||||
],
|
||||
)
|
||||
def test_config_get_valid(args: List[str], expected: str):
|
||||
assert (
|
||||
sync.config_get(args[0], *args[1:], config_file=Path("tests/config.json"))
|
||||
== expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"args, expected",
|
||||
[
|
||||
(["bot_stonks", "bot"], pytest.raises(KeyError)),
|
||||
],
|
||||
)
|
||||
def test_config_get_invalid(args: List[str], expected: Any):
|
||||
with expected:
|
||||
assert (
|
||||
sync.config_get(args[0], *args[1:], config_file=Path("tests/config.json"))
|
||||
== expected
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, path, value",
|
||||
[
|
||||
("locale", [], "en"),
|
||||
("bot_token", ["bot"], "sample_token"),
|
||||
],
|
||||
)
|
||||
def test_config_set(key: str, path: List[str], value: Any):
|
||||
sync.config_set(key, value, *path, config_file=Path("tests/config.json"))
|
||||
assert sync.config_get(key, *path, config_file=Path("tests/config.json")) == value
|
58
tests/test_i18n_async.py
Normal file
58
tests/test_i18n_async.py
Normal file
@@ -0,0 +1,58 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot import i18n
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, locale, expected",
|
||||
[
|
||||
("foo", [], None, "bar"),
|
||||
("foo", [], "uk", "бар"),
|
||||
("example", ["messages"], None, "okay"),
|
||||
("example", ["messages"], "uk", "окей"),
|
||||
("nested", ["callbacks", "default"], None, "sure"),
|
||||
("nested", ["callbacks", "default"], "uk", "авжеж"),
|
||||
],
|
||||
)
|
||||
async def test_i18n_get(
|
||||
key: str, args: List[str], locale: Union[str, None], expected: Any
|
||||
):
|
||||
assert (
|
||||
await i18n._(key, *args, locale=locale, locales_root=Path("tests/data/locale"))
|
||||
if locale is not None
|
||||
else await i18n._(key, *args, locales_root=Path("tests/data/locale"))
|
||||
) == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, expected",
|
||||
[
|
||||
("foo", [], ["bar", "бар"]),
|
||||
("example", ["messages"], ["okay", "окей"]),
|
||||
("nested", ["callbacks", "default"], ["sure", "авжеж"]),
|
||||
],
|
||||
)
|
||||
async def test_i18n_in_all_locales(key: str, args: List[str], expected: Any):
|
||||
assert (
|
||||
await i18n.in_all_locales(key, *args, locales_root=Path("tests/data/locale"))
|
||||
) == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, expected",
|
||||
[
|
||||
("foo", [], {"en": "bar", "uk": "бар"}),
|
||||
("example", ["messages"], {"en": "okay", "uk": "окей"}),
|
||||
("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}),
|
||||
],
|
||||
)
|
||||
async def test_i18n_in_every_locale(key: str, args: List[str], expected: Any):
|
||||
assert (
|
||||
await i18n.in_every_locale(key, *args, locales_root=Path("tests/data/locale"))
|
||||
) == expected
|
53
tests/test_i18n_sync.py
Normal file
53
tests/test_i18n_sync.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot.i18n import sync
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, locale, expected",
|
||||
[
|
||||
("foo", [], None, "bar"),
|
||||
("foo", [], "uk", "бар"),
|
||||
("example", ["messages"], None, "okay"),
|
||||
("example", ["messages"], "uk", "окей"),
|
||||
("nested", ["callbacks", "default"], None, "sure"),
|
||||
("nested", ["callbacks", "default"], "uk", "авжеж"),
|
||||
],
|
||||
)
|
||||
def test_i18n_get(key: str, args: List[str], locale: Union[str, None], expected: Any):
|
||||
assert (
|
||||
sync._(key, *args, locale=locale, locales_root=Path("tests/data/locale"))
|
||||
if locale is not None
|
||||
else sync._(key, *args, locales_root=Path("tests/data/locale"))
|
||||
) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, expected",
|
||||
[
|
||||
("foo", [], ["bar", "бар"]),
|
||||
("example", ["messages"], ["okay", "окей"]),
|
||||
("nested", ["callbacks", "default"], ["sure", "авжеж"]),
|
||||
],
|
||||
)
|
||||
def test_i18n_in_all_locales(key: str, args: List[str], expected: Any):
|
||||
assert (
|
||||
sync.in_all_locales(key, *args, locales_root=Path("tests/data/locale"))
|
||||
) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"key, args, expected",
|
||||
[
|
||||
("foo", [], {"en": "bar", "uk": "бар"}),
|
||||
("example", ["messages"], {"en": "okay", "uk": "окей"}),
|
||||
("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}),
|
||||
],
|
||||
)
|
||||
def test_i18n_in_every_locale(key: str, args: List[str], expected: Any):
|
||||
assert (
|
||||
sync.in_every_locale(key, *args, locales_root=Path("tests/data/locale"))
|
||||
) == expected
|
64
tests/test_json_async.py
Normal file
64
tests/test_json_async.py
Normal file
@@ -0,0 +1,64 @@
|
||||
try:
|
||||
from ujson import JSONDecodeError, dumps
|
||||
except ImportError:
|
||||
from json import dumps, JSONDecodeError
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot import json_read, json_write
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
(
|
||||
"tests/data/test.json",
|
||||
{
|
||||
"foo": "bar",
|
||||
"abcdefg": ["higklmnop", {"lol": {"kek": [1.0000035, 0.2542, 1337]}}],
|
||||
},
|
||||
),
|
||||
("tests/data/empty.json", {}),
|
||||
],
|
||||
)
|
||||
async def test_json_read_valid(path: Union[str, Path], expected: Any):
|
||||
assert await json_read(path) == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
("tests/data/invalid.json", JSONDecodeError),
|
||||
("tests/data/nonexistent.json", FileNotFoundError),
|
||||
],
|
||||
)
|
||||
async def test_json_read_invalid(path: Union[str, Path], expected: Any):
|
||||
with pytest.raises(expected):
|
||||
await json_read(path)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
"data, path",
|
||||
[
|
||||
(
|
||||
{
|
||||
"foo": "bar",
|
||||
"abcdefg": ["higklmnop", {"lol": {"kek": [1.0000035, 0.2542, 1337]}}],
|
||||
},
|
||||
"tests/data/test.json",
|
||||
),
|
||||
({}, "tests/data/empty.json"),
|
||||
],
|
||||
)
|
||||
async def test_json_write(data: Any, path: Union[str, Path]):
|
||||
await json_write(data, path)
|
||||
|
||||
assert Path(path).is_file()
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
assert f.read() == dumps(data, ensure_ascii=False, indent=4)
|
61
tests/test_json_sync.py
Normal file
61
tests/test_json_sync.py
Normal file
@@ -0,0 +1,61 @@
|
||||
try:
|
||||
from ujson import JSONDecodeError, dumps
|
||||
except ImportError:
|
||||
from json import dumps, JSONDecodeError
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Any, Union
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot import sync
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
(
|
||||
"tests/data/test.json",
|
||||
{
|
||||
"foo": "bar",
|
||||
"abcdefg": ["higklmnop", {"lol": {"kek": [1.0000035, 0.2542, 1337]}}],
|
||||
},
|
||||
),
|
||||
("tests/data/empty.json", {}),
|
||||
],
|
||||
)
|
||||
def test_json_read_valid(path: Union[str, Path], expected: Any):
|
||||
assert sync.json_read(path) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"path, expected",
|
||||
[
|
||||
("tests/data/invalid.json", JSONDecodeError),
|
||||
("tests/data/nonexistent.json", FileNotFoundError),
|
||||
],
|
||||
)
|
||||
def test_json_read_invalid(path: Union[str, Path], expected: Any):
|
||||
with pytest.raises(expected):
|
||||
assert sync.json_read(path) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"data, path",
|
||||
[
|
||||
(
|
||||
{
|
||||
"foo": "bar",
|
||||
"abcdefg": ["higklmnop", {"lol": {"kek": [1.0000035, 0.2542, 1337]}}],
|
||||
},
|
||||
"tests/data/test.json",
|
||||
),
|
||||
({}, "tests/data/empty.json"),
|
||||
],
|
||||
)
|
||||
def test_json_write(data: Any, path: Union[str, Path]):
|
||||
sync.json_write(data, path)
|
||||
|
||||
assert Path(path).is_file()
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
assert f.read() == dumps(data, ensure_ascii=False, indent=4)
|
63
tests/test_nested_set.py
Normal file
63
tests/test_nested_set.py
Normal file
@@ -0,0 +1,63 @@
|
||||
from typing import Any, List
|
||||
|
||||
import pytest
|
||||
|
||||
from libbot import sync
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"target, value, path, create_missing, expected",
|
||||
[
|
||||
({"foo": "bar"}, "rab", ["foo"], True, {"foo": "rab"}),
|
||||
({"foo": "bar"}, {"123": 456}, ["foo"], True, {"foo": {"123": 456}}),
|
||||
(
|
||||
{"foo": {"bar": {}}},
|
||||
True,
|
||||
["foo", "bar", "test"],
|
||||
True,
|
||||
{"foo": {"bar": {"test": True}}},
|
||||
),
|
||||
(
|
||||
{"foo": {"bar": {}}},
|
||||
True,
|
||||
["foo", "bar", "test"],
|
||||
False,
|
||||
{"foo": {"bar": {}}},
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_nested_set_valid(
|
||||
target: dict[str, Any],
|
||||
value: Any,
|
||||
path: List[str],
|
||||
create_missing: bool,
|
||||
expected: Any,
|
||||
):
|
||||
assert (
|
||||
sync.nested_set(target, value, *path, create_missing=create_missing)
|
||||
) == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"target, value, path, create_missing, expected",
|
||||
[
|
||||
(
|
||||
{"foo": {"bar": {}}},
|
||||
True,
|
||||
["foo", "bar", "test1", "test2"],
|
||||
False,
|
||||
KeyError,
|
||||
),
|
||||
],
|
||||
)
|
||||
def test_nested_set_invalid(
|
||||
target: dict[str, Any],
|
||||
value: Any,
|
||||
path: List[str],
|
||||
create_missing: bool,
|
||||
expected: Any,
|
||||
):
|
||||
with pytest.raises(expected):
|
||||
assert (
|
||||
sync.nested_set(target, value, *path, create_missing=create_missing)
|
||||
) == expected
|
Reference in New Issue
Block a user