v4.0.0 to dev #168

Merged
profitroll merged 3 commits from overhaul-v4 into dev 2024-12-26 19:44:57 +02:00
36 changed files with 788 additions and 777 deletions

View File

@ -8,17 +8,16 @@ dynamic = ["version", "dependencies", "optional-dependencies"]
authors = [{ name = "Profitroll" }] authors = [{ name = "Profitroll" }]
description = "Universal bot library with functions needed for basic Discord/Telegram bot development." description = "Universal bot library with functions needed for basic Discord/Telegram bot development."
readme = "README.md" readme = "README.md"
requires-python = ">=3.9" requires-python = ">=3.11"
license = { text = "GPLv3" } license = { text = "GPLv3" }
classifiers = [ classifiers = [
"Development Status :: 3 - Alpha", "Development Status :: 3 - Alpha",
"Intended Audience :: Developers", "Intended Audience :: Developers",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)", "License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules", "Topic :: Software Development :: Libraries :: Python Modules",
"Topic :: Utilities", "Topic :: Utilities",
] ]
@ -42,7 +41,8 @@ Tracker = "https://git.end-play.xyz/profitroll/LibBotUniversal/issues"
where = ["src"] where = ["src"]
[tool.black] [tool.black]
target-version = ['py39', 'py310', 'py311' ,'py312'] line-length = 108
target-version = ["py311", "py312", "py313"]
[tool.isort] [tool.isort]
profile = "black" profile = "black"
@ -52,6 +52,8 @@ minversion = "6.0"
python_files = ["test_*.py"] python_files = ["test_*.py"]
pythonpath = "." pythonpath = "."
testpaths = ["tests"] testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
[tool.mypy] [tool.mypy]
namespace_packages = true namespace_packages = true
@ -59,9 +61,12 @@ install_types = true
strict = true strict = true
show_error_codes = true show_error_codes = true
[tool.pylint]
disable = ["line-too-long"]
[tool.pylint.main] [tool.pylint.main]
extension-pkg-whitelist = ["ujson"] extension-pkg-whitelist = ["ujson"]
py-version = 3.9 py-version = 3.11
[tool.coverage.run] [tool.coverage.run]
source = ["libbot"] source = ["libbot"]

View File

@ -1,6 +1,5 @@
__version__ = "3.3.1" __version__ = "4.0.0"
__license__ = "GPL3" __license__ = "GPL3"
__author__ = "Profitroll" __author__ = "Profitroll"
from . import errors, i18n, pycord, pyrogram, sync from . import utils, errors, i18n, pycord, pyrogram
from .__main__ import config_delete, config_get, config_set, json_read, json_write

View File

@ -1,131 +0,0 @@
from pathlib import Path
from typing import Any, Union
import aiofiles
try:
from ujson import dumps, loads
except ImportError:
from json import dumps, loads
from ._utils import supports_argument
from .sync._nested import nested_delete, nested_set
DEFAULT_CONFIG_LOCATION: str = "config.json"
async def json_read(path: Union[str, Path]) -> Any:
"""Read contents of a JSON file
### Args:
* path (`Union[str, Path]`): Path-like object or path as a string
### Returns:
* `Any`: File contents
"""
async with aiofiles.open(str(path), mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: Union[str, Path]) -> None:
"""Write contents to a JSON file
### Args:
* data (`Any`): Contents to write. Must be a JSON serializable
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
"""
async with aiofiles.open(str(path), mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
if supports_argument(dumps, "escape_forward_slashes")
else dumps(data, ensure_ascii=False, indent=4)
)
async def config_get(
key: str, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION
) -> Any:
"""Get a value of the config key by its path provided
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
### Args:
* key (`str`): Key that contains the value
* *path (`str`): Path to the key that contains the value
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Returns:
* `Any`: Key's value
### Example:
Get the "salary" of "Pete" from this JSON structure:
```json
{
"users": {
"Pete": {
"salary": 10.0
}
}
}
```
This can be easily done with the following code:
```python
import libbot
salary = await libbot.config_get("salary", "users", "Pete")
```
"""
this_key = await json_read(config_file)
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(
key: str, value: Any, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION
) -> None:
"""Set config's key by its path to the value
### Args:
* key (`str`): Key that leads to the value
* value (`Any`): Any JSON serializable data
* *path (`str`): Path to the key of the target
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided
"""
await json_write(
nested_set(await json_read(config_file), value, *(*path, key)), config_file
)
async def config_delete(
key: str,
*path: str,
missing_ok: bool = False,
config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION,
) -> None:
"""Set config's key by its path
### Args:
* key (`str`): Key to delete
* *path (`str`): Path to the key of the target
* missing_ok (`bool`): Do not raise an exception if the key is missing. Defaults to `False`
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided and `missing_ok` is `False`
"""
config_data = await json_read(config_file)
try:
nested_delete(config_data, *(*path, key))
except KeyError as exc:
if not missing_ok:
raise exc from exc
await json_write(config_data, config_file)

View File

@ -1,22 +0,0 @@
import inspect
from typing import Callable
def supports_argument(func: Callable, arg_name: str) -> bool:
"""Check whether a function has a specific argument
### Args:
* func (`Callable`): Function to be inspected
* arg_name (`str`): Argument to be checked
### Returns:
* `bool`: `True` if argument is supported and `False` if not
"""
if hasattr(func, "__code__"):
return arg_name in inspect.signature(func).parameters
elif hasattr(func, "__doc__"):
if doc := func.__doc__:
first_line = doc.splitlines()[0]
return arg_name in first_line
return False

View File

@ -1,15 +1,15 @@
from typing import Any, List, Optional, Union from typing import Any, List, Optional
class ConfigKeyError(Exception): class ConfigKeyError(Exception):
"""Raised when config key is not found. """Raised when config key is not found.
### Attributes: ### Attributes:
* key (`Union[str, List[str]]`): Missing config key. * key (`str | List[str]`): Missing config key.
""" """
def __init__(self, key: Union[str, List[str]]) -> None: def __init__(self, key: str | List[str]) -> None:
self.key: Union[str, List[str]] = key self.key: str | List[str] = key
super().__init__( super().__init__(
f"Config key {'.'.join(key) if isinstance(key, list) else key} is missing. Please set in your config file." f"Config key {'.'.join(key) if isinstance(key, list) else key} is missing. Please set in your config file."
) )
@ -22,12 +22,12 @@ class ConfigValueError(Exception):
"""Raised when config key's value is invalid. """Raised when config key's value is invalid.
### Attributes: ### Attributes:
* key (`Union[str, List[str]]`): Invalid config key. * key (`str | List[str]`): Invalid config key.
* value (`Optional[Any]`): Key's correct value. * value (`Optional[Any]`): Key's correct value.
""" """
def __init__(self, key: Union[str, List[str]], value: Optional[Any] = None) -> None: def __init__(self, key: str | List[str], value: Optional[Any] = None) -> None:
self.key: Union[str, List[str]] = key self.key: str | List[str] = key
self.value: Optional[Any] = value self.value: Optional[Any] = value
super().__init__( super().__init__(
f"Config key {'.'.join(key) if isinstance(key, list) else key} has invalid value. {f'Must be {value}. ' if value else ''}Please set in your config file." f"Config key {'.'.join(key) if isinstance(key, list) else key} has invalid value. {f'Must be {value}. ' if value else ''}Please set in your config file."

View File

@ -1,118 +1,2 @@
from os import listdir from ._functions import _, in_all_locales, in_every_locale
from pathlib import Path from .classes import BotLocale
from typing import Any, Dict, Union
import libbot
from libbot.i18n import sync
from libbot.i18n.classes.bot_locale import BotLocale
async def _(
key: str,
*args: str,
locale: Union[str, None] = "en",
locales_root: Union[str, Path] = Path("locale"),
) -> Any:
"""Get value of locale string
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`.
* locale (`Union[str, None]`): Locale to looked up in. Defaults to `"en"`.
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
"""
locale = libbot.sync.config_get("locale") if locale is None else locale
try:
this_dict = await libbot.json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
try:
this_dict = await libbot.json_read(
Path(f'{locales_root}/{await libbot.config_get("locale")}.json')
)
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
async def in_all_locales(
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
) -> list:
"""Get value of the provided key and path in all available locales
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`.
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `list`: List of values in all locales
"""
output = []
files_locales = listdir(locales_root)
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
for lc in valid_locales:
try:
this_dict = await libbot.json_read(Path(f"{locales_root}/{lc}.json"))
except FileNotFoundError:
continue
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output.append(this_key[key])
except KeyError:
continue
return output
async def in_every_locale(
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
) -> Dict[str, Any]:
"""Get value of the provided key and path in every available locale with locale tag
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`.
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
"""
output = {}
files_locales = listdir(locales_root)
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
for lc in valid_locales:
try:
this_dict = await libbot.json_read(Path(f"{locales_root}/{lc}.json"))
except FileNotFoundError:
continue
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output[lc] = this_key[key]
except KeyError:
continue
return output

View File

@ -0,0 +1,232 @@
from os import listdir, PathLike
from pathlib import Path
from typing import Any, Dict, List
from ..utils.config import config_get
from ..utils.json import json_read
from ..utils.syncs import asyncable
def _get_valid_locales(locales_root: str | PathLike[str]) -> List[str]:
return [".".join(entry.split(".")[:-1]) for entry in listdir(locales_root)]
@asyncable
def _(
key: str,
*args: str,
locale: str | None = "en",
locales_root: str | Path = Path("locale"),
) -> Any:
"""Get value of locale string
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`str`): Path to key like: `dict[args][key]`.
* locale (`str | None`): Locale to looked up in. Defaults to `"en"`.
* locales_root (`str | Path`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Any`: Value of provided locale key. Is usually `str`, `Dict[str, Any]` or `List[Any]`
"""
if locale is None:
locale: str = config_get("locale")
try:
this_dict: Dict[str, Any] = json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
try:
this_dict: Dict[str, Any] = json_read(Path(f'{locales_root}/{config_get("locale")}.json'))
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
this_key: Dict[str, Any] = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
@_.asynchronous
async def _(
key: str,
*args: str,
locale: str | None = "en",
locales_root: str | Path = Path("locale"),
) -> Any:
"""Get value of locale string
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`str`): Path to key like: `dict[args][key]`.
* locale (`str | None`): Locale to looked up in. Defaults to `"en"`.
* locales_root (`str | Path`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Any`: Value of provided locale key. Is usually `str`, `Dict[str, Any]` or `List[Any]`
"""
locale: str = config_get("locale") if locale is None else locale
try:
this_dict: Dict[str, Any] = await json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
try:
this_dict: Dict[str, Any] = await json_read(
Path(f'{locales_root}/{await config_get("locale")}.json')
)
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
this_key: Dict[str, Any] = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
@asyncable
def in_all_locales(key: str, *args: str, locales_root: str | Path = Path("locale")) -> List[Any]:
"""Get value of the provided key and path in all available locales
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`str`): Path to key like: `dict[args][key]`.
* locales_root (`str | Path`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `List[Any]`: List of values in all locales
"""
output: List[Any] = []
for locale in _get_valid_locales(locales_root):
try:
this_dict: Dict[str, Any] = json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
continue
this_key: Dict[str, Any] = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output.append(this_key[key])
except KeyError:
continue
return output
@in_all_locales.asynchronous
async def in_all_locales(key: str, *args: str, locales_root: str | Path = Path("locale")) -> List[Any]:
"""Get value of the provided key and path in all available locales
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`str`): Path to key like: `dict[args][key]`.
* locales_root (`str | Path`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `List[Any]`: List of values in all locales
"""
output: List[Any] = []
for locale in _get_valid_locales(locales_root):
try:
this_dict: Dict[str, Any] = await json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
continue
this_key: Dict[str, Any] = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output.append(this_key[key])
except KeyError:
continue
return output
@asyncable
def in_every_locale(
key: str, *args: str, locales_root: str | Path = Path("locale")
) -> Dict[str, Any]:
"""Get value of the provided key and path in every available locale with locale tag
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`str`): Path to key like: `dict[args][key]`.
* locales_root (`str | Path`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Dict[str, Any]`: Locale is a key, and it's value from locale file is a value
"""
output: Dict[str, Any] = {}
for locale in _get_valid_locales(locales_root):
try:
this_dict: Dict[str, Any] = json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
continue
this_key: Dict[str, Any] = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output[locale] = this_key[key]
except KeyError:
continue
return output
@in_every_locale.asynchronous
async def in_every_locale(
key: str, *args: str, locales_root: str | Path = Path("locale")
) -> Dict[str, Any]:
"""Get value of the provided key and path in every available locale with locale tag
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`str`): Path to key like: `dict[args][key]`.
* locales_root (`str | Path`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Dict[str, Any]`: Locale is a key, and it's value from locale file is a value
"""
output: Dict[str, Any] = {}
for locale in _get_valid_locales(locales_root):
try:
this_dict: Dict[str, Any] = await json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
continue
this_key: Dict[str, Any] = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output[locale] = this_key[key]
except KeyError:
continue
return output

View File

@ -0,0 +1 @@
from .bot_locale import BotLocale

View File

@ -1,8 +1,9 @@
from os import listdir from os import listdir
from pathlib import Path from pathlib import Path
from typing import Any, Dict, Union from typing import Any, Dict, List
from libbot import sync from ...utils.config import config_get
from ...utils.json import json_read
class BotLocale: class BotLocale:
@ -10,52 +11,50 @@ class BotLocale:
def __init__( def __init__(
self, self,
default_locale: Union[str, None] = "en", default_locale: str | None = "en",
locales_root: Union[str, Path] = Path("locale"), locales_root: str | Path = Path("locale"),
) -> None: ) -> None:
if isinstance(locales_root, str): if isinstance(locales_root, str):
locales_root = Path(locales_root) locales_root = Path(locales_root)
elif not isinstance(locales_root, Path): elif not isinstance(locales_root, Path):
raise TypeError("'locales_root' must be a valid path or path-like object") raise TypeError("'locales_root' must be a valid path or path-like object")
files_locales: list = listdir(locales_root) files_locales: List[str] = listdir(locales_root)
valid_locales: list = [ valid_locales: List[str] = [".".join(entry.split(".")[:-1]) for entry in files_locales]
".".join(entry.split(".")[:-1]) for entry in files_locales
]
self.default: str = ( self.default: str = config_get("locale") if default_locale is None else default_locale
sync.config_get("locale") if default_locale is None else default_locale self.locales: Dict[str, Any] = {}
)
self.locales: dict = {}
for lc in valid_locales: for locale in valid_locales:
self.locales[lc] = sync.json_read(Path(f"{locales_root}/{lc}.json")) self.locales[locale] = json_read(Path(f"{locales_root}/{locale}.json"))
def _(self, key: str, *args: str, locale: Union[str, None] = None) -> Any: def _(self, key: str, *args: str, locale: str | None = None) -> Any:
"""Get value of locale string """Get value of locale string
### Args: ### Args:
* key (`str`): The last key of the locale's keys path * key (`str`): The last key of the locale's keys path
* *args (`list`): Path to key like: `dict[args][key]` * *args (`str`): Path to key like: `dict[args][key]`
* locale (`Union[str, None]`, *optional*): Locale to looked up in. Defaults to config's `"locale"` value * locale (`str | None`, *optional*): Locale to looked up in. Defaults to config's `"locale"` value
### Returns: ### Returns:
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list` * `Any`: Value of provided locale key. Is usually `str`, `Dict[str, Any]` or `List[Any]`
""" """
if locale is None: if locale is None:
locale = self.default locale: str = self.default
try: try:
this_dict = self.locales[locale] this_dict: Dict[str, Any] = self.locales[locale]
except KeyError: except KeyError:
try: try:
this_dict = self.locales[self.default] this_dict: Dict[str, Any] = self.locales[self.default]
except KeyError: except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"' return (
f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
)
this_key: Dict[str, Any] = this_dict
this_key = this_dict
for dict_key in args: for dict_key in args:
this_key = this_key[dict_key] this_key = this_key[dict_key]
@ -64,26 +63,26 @@ class BotLocale:
except KeyError: except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"' return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
def in_all_locales(self, key: str, *args: str) -> list: def in_all_locales(self, key: str, *args: str) -> List[Any]:
"""Get value of the provided key and path in all available locales """Get value of the provided key and path in all available locales
### Args: ### Args:
* key (`str`): The last key of the locale's keys path. * key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`. * *args (`str`): Path to key like: `dict[args][key]`.
### Returns: ### Returns:
* `list`: List of values in all locales * `List[Any]`: List of values in all locales
""" """
output: List[Any] = []
output = [] for name, locale in self.locales.items():
for name, lc in self.locales.items():
try: try:
this_dict = lc this_dict: Dict[str, Any] = locale
except KeyError: except KeyError:
continue continue
this_key = this_dict this_key: Dict[str, Any] = this_dict
for dict_key in args: for dict_key in args:
this_key = this_key[dict_key] this_key = this_key[dict_key]
@ -99,21 +98,21 @@ class BotLocale:
### Args: ### Args:
* key (`str`): The last key of the locale's keys path. * key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`. * *args (`str`): Path to key like: `dict[args][key]`.
### Returns: ### Returns:
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value * `Dict[str, Any]`: Locale is a key, and it's value from locale file is a value
""" """
output: Dict[str, Any] = {}
output = {} for name, locale in self.locales.items():
for name, lc in self.locales.items():
try: try:
this_dict = lc this_dict: Dict[str, Any] = locale
except KeyError: except KeyError:
continue continue
this_key = this_dict this_key: Dict[str, Any] = this_dict
for dict_key in args: for dict_key in args:
this_key = this_key[dict_key] this_key = this_key[dict_key]

View File

@ -1,117 +0,0 @@
from os import listdir
from pathlib import Path
from typing import Any, Dict, Union
import libbot
def _(
key: str,
*args: str,
locale: Union[str, None] = "en",
locales_root: Union[str, Path] = Path("locale"),
) -> Any:
"""Get value of locale string
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`.
* locale (`Union[str, None]`): Locale to looked up in. Defaults to `"en"`.
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Any`: Value of provided locale key. Is usually `str`, `dict` or `list`
"""
if locale is None:
locale = libbot.sync.config_get("locale")
try:
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{locale}.json"))
except FileNotFoundError:
try:
this_dict = libbot.sync.json_read(
Path(f'{locales_root}/{libbot.sync.config_get("locale")}.json')
)
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {args} from locale "{locale}"'
def in_all_locales(
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
) -> list:
"""Get value of the provided key and path in all available locales
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`.
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `list`: List of values in all locales
"""
output = []
files_locales = listdir(locales_root)
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
for lc in valid_locales:
try:
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{lc}.json"))
except FileNotFoundError:
continue
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output.append(this_key[key])
except KeyError:
continue
return output
def in_every_locale(
key: str, *args: str, locales_root: Union[str, Path] = Path("locale")
) -> Dict[str, Any]:
"""Get value of the provided key and path in every available locale with locale tag
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: `dict[args][key]`.
* locales_root (`Union[str, Path]`, *optional*): Folder where locales are located. Defaults to `Path("locale")`.
### Returns:
* `Dict[str, Any]`: Locale is a key and it's value from locale file is a value
"""
output = {}
files_locales = listdir(locales_root)
valid_locales = [".".join(entry.split(".")[:-1]) for entry in files_locales]
for lc in valid_locales:
try:
this_dict = libbot.sync.json_read(Path(f"{locales_root}/{lc}.json"))
except FileNotFoundError:
continue
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output[lc] = this_key[key]
except KeyError:
continue
return output

View File

View File

@ -1,35 +1,35 @@
import logging import logging
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Union from typing import Any, Dict
from typing_extensions import override
try: try:
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.background import BackgroundScheduler
from discord import Bot from discord import Bot
except ImportError as exc: except ImportError as exc:
raise ImportError( raise ImportError("You need to install libbot[pycord] in order to use this class.") from exc
"You need to install libbot[pycord] in order to use this class."
) from exc
try: try:
from ujson import loads from ujson import loads
except ImportError: except ImportError:
from json import loads from json import loads
from libbot.i18n import BotLocale from ...i18n.classes import BotLocale
from libbot.i18n.sync import _
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PycordBot(Bot): class PycordBot(Bot):
@override
def __init__( def __init__(
self, self,
config: Union[Dict[str, Any], None] = None,
config_path: Union[str, Path] = Path("config.json"),
locales_root: Union[str, Path, None] = None,
scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = None,
*args, *args,
config: Dict[str, Any] | None = None,
config_path: str | Path = Path("config.json"),
locales_root: str | Path | None = None,
scheduler: AsyncIOScheduler | BackgroundScheduler | None = None,
**kwargs, **kwargs,
): ):
if config is None: if config is None:
@ -39,9 +39,7 @@ class PycordBot(Bot):
self.config = config self.config = config
super().__init__( super().__init__(
debug_guilds=( debug_guilds=(self.config["bot"]["debug_guilds"] if self.config["debug"] else None),
self.config["bot"]["debug_guilds"] if self.config["debug"] else None
),
owner_ids=self.config["bot"]["owners"], owner_ids=self.config["bot"]["owners"],
*args, *args,
**kwargs, **kwargs,
@ -58,14 +56,16 @@ class PycordBot(Bot):
self.in_all_locales = self.bot_locale.in_all_locales self.in_all_locales = self.bot_locale.in_all_locales
self.in_every_locale = self.bot_locale.in_every_locale self.in_every_locale = self.bot_locale.in_every_locale
self.scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = scheduler self.scheduler: AsyncIOScheduler | BackgroundScheduler | None = scheduler
@override
async def start(self, token: str, reconnect: bool = True, scheduler_start: bool = True) -> None: async def start(self, token: str, reconnect: bool = True, scheduler_start: bool = True) -> None:
if self.scheduler is not None and scheduler_start: if self.scheduler is not None and scheduler_start:
self.scheduler.start() self.scheduler.start()
await super().start(token, reconnect=reconnect) await super().start(token, reconnect=reconnect)
@override
async def close(self, scheduler_shutdown: bool = True, scheduler_wait: bool = True) -> None: async def close(self, scheduler_shutdown: bool = True, scheduler_wait: bool = True) -> None:
if self.scheduler is not None and scheduler_shutdown: if self.scheduler is not None and scheduler_shutdown:
self.scheduler.shutdown(scheduler_wait) self.scheduler.shutdown(scheduler_wait)

View File

View File

@ -1,10 +1,13 @@
import asyncio import asyncio
import logging import logging
import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
from os import cpu_count, getpid from os import cpu_count, getpid
from pathlib import Path from pathlib import Path
from time import time from time import time
from typing import Any, Dict, List, Union from typing import Any, Dict, List
from typing_extensions import override
try: try:
import pyrogram import pyrogram
@ -25,43 +28,42 @@ try:
BotCommandScopeDefault, BotCommandScopeDefault,
) )
except ImportError as exc: except ImportError as exc:
raise ImportError( raise ImportError("You need to install libbot[pyrogram] in order to use this class.") from exc
"You need to install libbot[pyrogram] in order to use this class."
) from exc
try: try:
from ujson import dumps, loads from ujson import dumps, loads
except ImportError: except ImportError:
from json import dumps, loads from json import dumps, loads
from libbot.i18n import BotLocale from ...i18n.classes import BotLocale
from libbot.i18n.sync import _ from ...i18n import _
from libbot.pyrogram.classes.command import PyroCommand from .command import PyroCommand
from libbot.pyrogram.classes.commandset import CommandSet from .commandset import CommandSet
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class PyroClient(Client): class PyroClient(Client):
@override
def __init__( def __init__(
self, self,
name: str = "bot_client", name: str = "bot_client",
owner: Union[int, None] = None, owner: int | None = None,
config: Union[Dict[str, Any], None] = None, config: Dict[str, Any] | None = None,
config_path: Union[str, Path] = Path("config.json"), config_path: str | Path = Path("config.json"),
api_id: Union[int, None] = None, api_id: int | None = None,
api_hash: Union[str, None] = None, api_hash: str | None = None,
bot_token: Union[str, None] = None, bot_token: str | None = None,
workers: int = min(32, cpu_count() + 4), workers: int = min(32, cpu_count() + 4),
locales_root: Union[str, Path, None] = None, locales_root: str | Path | None = None,
plugins_root: str = "plugins", plugins_root: str = "plugins",
plugins_exclude: Union[List[str], None] = None, plugins_exclude: List[str] | None = None,
sleep_threshold: int = 120, sleep_threshold: int = 120,
max_concurrent_transmissions: int = 1, max_concurrent_transmissions: int = 1,
commands_source: Union[Dict[str, dict], None] = None, commands_source: Dict[str, dict] | None = None,
scoped_commands: Union[bool, None] = None, scoped_commands: bool | None = None,
i18n_bot_info: bool = False, i18n_bot_info: bool = False,
scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = None, scheduler: AsyncIOScheduler | BackgroundScheduler | None = None,
**kwargs, **kwargs,
): ):
if config is None: if config is None:
@ -74,26 +76,20 @@ class PyroClient(Client):
name=name, name=name,
api_id=self.config["bot"]["api_id"] if api_id is None else api_id, api_id=self.config["bot"]["api_id"] if api_id is None else api_id,
api_hash=self.config["bot"]["api_hash"] if api_hash is None else api_hash, api_hash=self.config["bot"]["api_hash"] if api_hash is None else api_hash,
bot_token=self.config["bot"]["bot_token"] bot_token=self.config["bot"]["bot_token"] if bot_token is None else bot_token,
if bot_token is None
else bot_token,
# Workers should be `min(32, cpu_count() + 4)`, otherwise # Workers should be `min(32, cpu_count() + 4)`, otherwise
# handlers land in another event loop and you won't see them # handlers land in another event loop and you won't see them
workers=self.config["bot"]["workers"] workers=self.config["bot"]["workers"] if "workers" in self.config["bot"] else workers,
if "workers" in self.config["bot"]
else workers,
plugins=dict( plugins=dict(
root=plugins_root, root=plugins_root,
exclude=self.config["disabled_plugins"] exclude=self.config["disabled_plugins"] if plugins_exclude is None else plugins_exclude,
if plugins_exclude is None
else plugins_exclude,
), ),
sleep_threshold=sleep_threshold, sleep_threshold=sleep_threshold,
max_concurrent_transmissions=self.config["bot"][ max_concurrent_transmissions=(
"max_concurrent_transmissions" self.config["bot"]["max_concurrent_transmissions"]
] if "max_concurrent_transmissions" in self.config["bot"]
if "max_concurrent_transmissions" in self.config["bot"] else max_concurrent_transmissions
else max_concurrent_transmissions, ),
**kwargs, **kwargs,
) )
self.owner: int = self.config["bot"]["owner"] if owner is None else owner self.owner: int = self.config["bot"]["owner"] if owner is None else owner
@ -102,9 +98,7 @@ class PyroClient(Client):
self.config["commands"] if commands_source is None else commands_source self.config["commands"] if commands_source is None else commands_source
) )
self.scoped_commands: bool = ( self.scoped_commands: bool = (
self.config["bot"]["scoped_commands"] self.config["bot"]["scoped_commands"] if scoped_commands is None else scoped_commands
if scoped_commands is None
else scoped_commands
) )
self.start_time: float = 0 self.start_time: float = 0
@ -119,12 +113,13 @@ class PyroClient(Client):
self.in_all_locales = self.bot_locale.in_all_locales self.in_all_locales = self.bot_locale.in_all_locales
self.in_every_locale = self.bot_locale.in_every_locale self.in_every_locale = self.bot_locale.in_every_locale
self.scheduler: Union[AsyncIOScheduler, BackgroundScheduler, None] = scheduler self.scheduler: AsyncIOScheduler | BackgroundScheduler | None = scheduler
self.scopes_placeholders: Dict[str, int] = {"owner": self.owner} self.scopes_placeholders: Dict[str, int] = {"owner": self.owner}
self.i18n_bot_info: bool = i18n_bot_info self.i18n_bot_info: bool = i18n_bot_info
@override
async def start(self, register_commands: bool = True, scheduler_start: bool = True) -> None: async def start(self, register_commands: bool = True, scheduler_start: bool = True) -> None:
await super().start() await super().start()
@ -178,7 +173,7 @@ class PyroClient(Client):
) )
logger.info( logger.info(
"Bot's info for the locale %s has been updated", "Bot's info for the locale %s has been updated",
self.code, code,
) )
except KeyError: except KeyError:
logger.warning( logger.warning(
@ -189,9 +184,11 @@ class PyroClient(Client):
# Send a message to the bot's reports chat about the startup # Send a message to the bot's reports chat about the startup
try: try:
await self.send_message( await self.send_message(
chat_id=self.owner chat_id=(
if self.config["reports"]["chat_id"] == "owner" self.owner
else self.config["reports"]["chat_id"], if self.config["reports"]["chat_id"] == "owner"
else self.config["reports"]["chat_id"]
),
text=f"Bot started PID `{getpid()}`", text=f"Bot started PID `{getpid()}`",
) )
except BadRequest: except BadRequest:
@ -212,14 +209,17 @@ class PyroClient(Client):
if scheduler_start: if scheduler_start:
self.scheduler.start() self.scheduler.start()
@override
async def stop( async def stop(
self, exit_completely: bool = True, scheduler_shutdown: bool = True, scheduler_wait: bool = True self, exit_completely: bool = True, scheduler_shutdown: bool = True, scheduler_wait: bool = True
) -> None: ) -> None:
try: try:
await self.send_message( await self.send_message(
chat_id=self.owner chat_id=(
if self.config["reports"]["chat_id"] == "owner" self.owner
else self.config["reports"]["chat_id"], if self.config["reports"]["chat_id"] == "owner"
else self.config["reports"]["chat_id"]
),
text=f"Bot stopped with PID `{getpid()}`", text=f"Bot stopped with PID `{getpid()}`",
) )
await asyncio.sleep(0.5) await asyncio.sleep(0.5)
@ -234,13 +234,11 @@ class PyroClient(Client):
if exit_completely: if exit_completely:
try: try:
exit() sys.exit()
except SystemExit as exc: except SystemExit as exc:
raise SystemExit( raise SystemExit("Bot has been shut down, this is not an application error!") from exc
"Bot has been shut down, this is not an application error!"
) from exc
async def collect_commands(self) -> Union[List[CommandSet], None]: async def collect_commands(self) -> List[CommandSet] | None:
"""Gather list of the bot's commands """Gather list of the bot's commands
### Returns: ### Returns:
@ -262,13 +260,9 @@ class PyroClient(Client):
scopes[dumps(scope)] = {"_": []} scopes[dumps(scope)] = {"_": []}
# Add command to the scope's flattened key in scopes dict # Add command to the scope's flattened key in scopes dict
scopes[dumps(scope)]["_"].append( scopes[dumps(scope)]["_"].append(BotCommand(command, _(command, "commands")))
BotCommand(command, _(command, "commands"))
)
for locale, string in ( for locale, string in (self.in_every_locale(command, "commands")).items():
self.in_every_locale(command, "commands")
).items():
if locale not in scopes[dumps(scope)]: if locale not in scopes[dumps(scope)]:
scopes[dumps(scope)][locale] = [] scopes[dumps(scope)][locale] = []
@ -287,11 +281,7 @@ class PyroClient(Client):
# Create object with the same name and args from the dict # Create object with the same name and args from the dict
try: try:
scope_obj = globals()[scope_dict["name"]]( scope_obj = globals()[scope_dict["name"]](
**{ **{key: value for key, value in scope_dict.items() if key != "name"}
key: value
for key, value in scope_dict.items()
if key != "name"
}
) )
except NameError: except NameError:
logger.error( logger.error(
@ -309,13 +299,9 @@ class PyroClient(Client):
# Add set of commands to the list of the command sets # Add set of commands to the list of the command sets
for locale, commands in locales.items(): for locale, commands in locales.items():
if locale == "_": if locale == "_":
command_sets.append( command_sets.append(CommandSet(commands, scope=scope_obj, language_code=""))
CommandSet(commands, scope=scope_obj, language_code="")
)
continue continue
command_sets.append( command_sets.append(CommandSet(commands, scope=scope_obj, language_code=locale))
CommandSet(commands, scope=scope_obj, language_code=locale)
)
logger.info("Registering the following command sets: %s", command_sets) logger.info("Registering the following command sets: %s", command_sets)
@ -323,7 +309,9 @@ class PyroClient(Client):
# This part here looks into the handlers and looks for commands # This part here looks into the handlers and looks for commands
# in it, if there are any. Then adds them to self.commands # in it, if there are any. Then adds them to self.commands
for handler in self.dispatcher.groups[0]: for handler in self.dispatcher.groups[0]:
if isinstance(handler, MessageHandler): if isinstance(handler, MessageHandler) and (
hasattr(handler.filters, "base") or hasattr(handler.filters, "other")
):
for entry in [handler.filters.base, handler.filters.other]: for entry in [handler.filters.base, handler.filters.other]:
if hasattr(entry, "commands"): if hasattr(entry, "commands"):
for command in entry.commands: for command in entry.commands:
@ -352,9 +340,7 @@ class PyroClient(Client):
command, command,
) )
async def register_commands( async def register_commands(self, command_sets: List[CommandSet] | None = None) -> None:
self, command_sets: Union[List[CommandSet], None] = None
) -> None:
"""Register commands stored in bot's 'commands' attribute""" """Register commands stored in bot's 'commands' attribute"""
if command_sets is None: if command_sets is None:
@ -363,10 +349,7 @@ class PyroClient(Client):
for command in self.commands for command in self.commands
] ]
logger.info( logger.info("Registering commands %s with a default scope 'BotCommandScopeDefault'", commands)
"Registering commands %s with a default scope 'BotCommandScopeDefault'",
commands
)
await self.set_bot_commands(commands) await self.set_bot_commands(commands)
return return
@ -384,15 +367,11 @@ class PyroClient(Client):
language_code=command_set.language_code, language_code=command_set.language_code,
) )
async def remove_commands( async def remove_commands(self, command_sets: List[CommandSet] | None = None) -> None:
self, command_sets: Union[List[CommandSet], None] = None
) -> None:
"""Remove commands stored in bot's 'commands' attribute""" """Remove commands stored in bot's 'commands' attribute"""
if command_sets is None: if command_sets is None:
logger.info( logger.info("Removing commands with a default scope 'BotCommandScopeDefault'")
"Removing commands with a default scope 'BotCommandScopeDefault'"
)
await self.delete_bot_commands(BotCommandScopeDefault()) await self.delete_bot_commands(BotCommandScopeDefault())
return return

View File

@ -1,5 +1,5 @@
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Union from typing import List
try: try:
from pyrogram.types import ( from pyrogram.types import (
@ -13,9 +13,7 @@ try:
BotCommandScopeDefault, BotCommandScopeDefault,
) )
except ImportError as exc: except ImportError as exc:
raise ImportError( raise ImportError("You need to install libbot[pyrogram] in order to use this class.") from exc
"You need to install libbot[pyrogram] in order to use this class."
) from exc
@dataclass @dataclass
@ -23,13 +21,13 @@ class CommandSet:
"""Command stored in PyroClient's 'commands' attribute""" """Command stored in PyroClient's 'commands' attribute"""
commands: List[BotCommand] commands: List[BotCommand]
scope: Union[ scope: (
BotCommandScopeDefault, BotCommandScopeDefault
BotCommandScopeAllPrivateChats, | BotCommandScopeAllPrivateChats
BotCommandScopeAllGroupChats, | BotCommandScopeAllGroupChats
BotCommandScopeAllChatAdministrators, | BotCommandScopeAllChatAdministrators
BotCommandScopeChat, | BotCommandScopeChat
BotCommandScopeChatAdministrators, | BotCommandScopeChatAdministrators
BotCommandScopeChatMember, | BotCommandScopeChatMember
] = BotCommandScopeDefault ) = BotCommandScopeDefault
language_code: str = "" language_code: str = ""

View File

@ -1 +0,0 @@
from .__main__ import config_delete, config_get, config_set, json_read, json_write

View File

@ -1,127 +0,0 @@
from pathlib import Path
from typing import Any, Union
from .._utils import supports_argument
from ._nested import nested_delete, nested_set
try:
from ujson import dumps, loads
except ImportError:
from json import dumps, loads
DEFAULT_CONFIG_LOCATION: str = "config.json"
def json_read(path: Union[str, Path]) -> Any:
"""Read contents of a JSON file
### Args:
* path (`Union[str, Path]`): Path-like object or path as a string
### Returns:
* `Any`: File contents
"""
with open(str(path), mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write(data: Any, path: Union[str, Path]) -> None:
"""Write contents to a JSON file
### Args:
* data (`Any`): Contents to write. Must be a JSON serializable
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
"""
with open(str(path), mode="w", encoding="utf-8") as f:
f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
if supports_argument(dumps, "escape_forward_slashes")
else dumps(data, ensure_ascii=False, indent=4)
)
def config_get(
key: str, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION
) -> Any:
"""Get a value of the config key by its path provided
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
### Args:
* key (`str`): Key that contains the value
* *path (`str`): Path to the key that contains the value
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Returns:
* `Any`: Key's value
### Example:
Get the "salary" of "Pete" from this JSON structure:
```json
{
"users": {
"Pete": {
"salary": 10.0
}
}
}
```
This can be easily done with the following code:
```python
import libbot
salary = libbot.sync.config_get("salary", "users", "Pete")
```
"""
this_key = json_read(config_file)
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set(
key: str, value: Any, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION
) -> None:
"""Set config's key by its path to the value
### Args:
* key (`str`): Key that leads to the value
* value (`Any`): Any JSON serializable data
* *path (`str`): Path to the key of the target
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided
"""
json_write(nested_set(json_read(config_file), value, *(*path, key)), config_file)
def config_delete(
key: str,
*path: str,
missing_ok: bool = False,
config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION,
) -> None:
"""Set config's key by its path
### Args:
* key (`str`): Key to delete
* *path (`str`): Path to the key of the target
* missing_ok (`bool`): Do not raise an exception if the key is missing. Defaults to `False`
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided and `missing_ok` is `False`
"""
config_data = json_read(config_file)
try:
nested_delete(config_data, *(*path, key))
except KeyError as exc:
if not missing_ok:
raise exc from exc
json_write(config_data, config_file)

View File

@ -0,0 +1,3 @@
from . import misc
from .config import config_get, config_set, config_delete
from .json import json_read, json_write

View File

@ -0,0 +1 @@
from ._functions import config_get, config_set, config_delete

View File

@ -0,0 +1,183 @@
from pathlib import Path
from typing import Any, Dict
from ..json import json_read, json_write
from ..misc import nested_delete, nested_set
from ..syncs import asyncable
try:
from ujson import dumps, loads
except ImportError:
from json import dumps, loads
DEFAULT_CONFIG_LOCATION: str = "config.json"
@asyncable
def config_get(key: str, *path: str, config_file: str | Path = DEFAULT_CONFIG_LOCATION) -> Any:
"""Get a value of the config key by its path provided
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
### Args:
* key (`str`): Key that contains the value
* *path (`str`): Path to the key that contains the value
* config_file (`str | Path`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Returns:
* `Any`: Key's value
### Example:
Get the "salary" of "Pete" from this JSON structure:
```json
{
"users": {
"Pete": {
"salary": 10.0
}
}
}
```
This can be easily done with the following code:
```python
import libbot
salary = libbot.sync.config_get("salary", "users", "Pete")
```
"""
this_key: Dict[str, Any] = json_read(config_file)
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
@config_get.asynchronous
async def config_get(key: str, *path: str, config_file: str | Path = DEFAULT_CONFIG_LOCATION) -> Any:
"""Get a value of the config key by its path provided
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
### Args:
* key (`str`): Key that contains the value
* *path (`str`): Path to the key that contains the value
* config_file (`str | Path`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Returns:
* `Any`: Key's value
### Example:
Get the "salary" of "Pete" from this JSON structure:
```json
{
"users": {
"Pete": {
"salary": 10.0
}
}
}
```
This can be easily done with the following code:
```python
import libbot
salary = await libbot.config_get("salary", "users", "Pete")
```
"""
this_key: Dict[str, Any] = await json_read(config_file)
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
@asyncable
def config_set(key: str, value: Any, *path: str, config_file: str | Path = DEFAULT_CONFIG_LOCATION) -> None:
"""Set config's key by its path to the value
### Args:
* key (`str`): Key that leads to the value
* value (`Any`): Any JSON serializable data
* *path (`str`): Path to the key of the target
* config_file (`str | Path`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided
"""
json_write(nested_set(json_read(config_file), value, *(*path, key)), config_file)
@config_set.asynchronous
async def config_set(
key: str, value: Any, *path: str, config_file: str | Path = DEFAULT_CONFIG_LOCATION
) -> None:
"""Set config's key by its path to the value
### Args:
* key (`str`): Key that leads to the value
* value (`Any`): Any JSON serializable data
* *path (`str`): Path to the key of the target
* config_file (`str | Path`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided
"""
await json_write(nested_set(await json_read(config_file), value, *(*path, key)), config_file)
@asyncable
def config_delete(
key: str,
*path: str,
missing_ok: bool = False,
config_file: str | Path = DEFAULT_CONFIG_LOCATION,
) -> None:
"""Set config's key by its path
### Args:
* key (`str`): Key to delete
* *path (`str`): Path to the key of the target
* missing_ok (`bool`): Do not raise an exception if the key is missing. Defaults to `False`
* config_file (`str | Path`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided and `missing_ok` is `False`
"""
config_data: Dict[str, Any] = json_read(config_file)
try:
nested_delete(config_data, *(*path, key))
except KeyError as exc:
if not missing_ok:
raise exc from exc
json_write(config_data, config_file)
@config_delete.asynchronous
async def config_delete(
key: str,
*path: str,
missing_ok: bool = False,
config_file: str | Path = DEFAULT_CONFIG_LOCATION,
) -> None:
"""Set config's key by its path
### Args:
* key (`str`): Key to delete
* *path (`str`): Path to the key of the target
* missing_ok (`bool`): Do not raise an exception if the key is missing. Defaults to `False`
* config_file (`str | Path`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided and `missing_ok` is `False`
"""
config_data: Dict[str, Any] = await json_read(config_file)
try:
nested_delete(config_data, *(*path, key))
except KeyError as exc:
if not missing_ok:
raise exc from exc
await json_write(config_data, config_file)

View File

@ -0,0 +1 @@
from ._functions import json_read, json_write

View File

@ -0,0 +1,76 @@
from pathlib import Path
from typing import Any
import aiofiles
from ..misc import supports_argument
from ..syncs import asyncable
try:
from ujson import dumps, loads
except ImportError:
from json import dumps, loads
@asyncable
def json_read(path: str | Path) -> Any:
"""Read contents of a JSON file
### Args:
* path (`str | Path`): Path-like object or path as a string
### Returns:
* `Any`: File contents
"""
with open(str(path), mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
@json_read.asynchronous
async def json_read(path: str | Path) -> Any:
"""Read contents of a JSON file
### Args:
* path (`str | Path`): Path-like object or path as a string
### Returns:
* `Any`: File contents
"""
async with aiofiles.open(str(path), mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
@asyncable
def json_write(data: Any, path: str | Path) -> None:
"""Write contents to a JSON file
### Args:
* data (`Any`): Contents to write. Must be a JSON serializable
* path (`str | Path`): Path-like object or path as a string of a destination
"""
with open(str(path), mode="w", encoding="utf-8") as f:
f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
if supports_argument(dumps, "escape_forward_slashes")
else dumps(data, ensure_ascii=False, indent=4)
)
@json_write.asynchronous
async def json_write(data: Any, path: str | Path) -> None:
"""Write contents to a JSON file
### Args:
* data (`Any`): Contents to write. Must be a JSON serializable
* path (`str | Path`): Path-like object or path as a string of a destination
"""
async with aiofiles.open(str(path), mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
if supports_argument(dumps, "escape_forward_slashes")
else dumps(data, ensure_ascii=False, indent=4)
)

View File

@ -0,0 +1 @@
from ._functions import supports_argument, nested_set, nested_delete

View File

@ -1,9 +1,30 @@
import inspect
from typing import Any, Dict from typing import Any, Dict
from typing import Callable
def nested_set( def supports_argument(func: Callable, arg_name: str) -> bool:
target: dict, value: Any, *path: str, create_missing=True """Check whether a function has a specific argument
) -> Dict[str, Any]:
### Args:
* func (`Callable`): Function to be inspected
* arg_name (`str`): Argument to be checked
### Returns:
* `bool`: `True` if argument is supported and `False` if not
"""
if hasattr(func, "__code__"):
return arg_name in inspect.signature(func).parameters
if hasattr(func, "__doc__"):
if doc := func.__doc__:
first_line = doc.splitlines()[0]
return arg_name in first_line
return False
def nested_set(target: dict, value: Any, *path: str, create_missing=True) -> Dict[str, Any]:
"""Set the key by its path to the value """Set the key by its path to the value
### Args: ### Args:

View File

@ -0,0 +1 @@
from ._syncs import asyncable

View File

@ -0,0 +1,69 @@
import asyncio
import inspect
from inspect import FrameInfo
from typing import Any, Callable, Optional, Type
class asyncable:
"""Allows to mark a callable able to be async.
Source: https://itsjohannawren.medium.com/single-call-sync-and-async-in-python-2acadd07c9d6"""
def __init__(self, method: Callable):
self.__sync = method
self.__async = None
def asynchronous(self, method: Callable) -> "asyncable":
if not isinstance(method, Callable):
raise RuntimeError("NOT CALLABLE!!!")
self.__async = method
return self
@staticmethod
def __is_awaited() -> bool:
frame: FrameInfo = inspect.stack()[2]
if not hasattr(frame, "positions"):
return False
return (
frame.positions.col_offset >= 6
and frame.code_context[frame.index][frame.positions.col_offset - 6 : frame.positions.col_offset]
== "await "
)
def __get__(
self,
instance: Type,
*args,
owner_class: Optional[Type[Type]] = None,
**kwargs,
) -> Callable:
if self.__is_awaited():
if self.__async is None:
raise RuntimeError(
"Attempting to call asyncable with await, but no asynchronous call has been defined"
)
bound_method = self.__async.__get__(instance, owner_class)
if isinstance(self.__sync, classmethod):
return lambda: asyncio.ensure_future(bound_method(owner_class, *args, **kwargs))
return lambda: asyncio.ensure_future(bound_method(*args, **kwargs))
bound_method = self.__sync.__get__(instance, owner_class)
return lambda: bound_method(*args, **kwargs)
def __call__(self, *args, **kwargs) -> Any:
if self.__is_awaited():
if self.__async is None:
raise RuntimeError(
"Attempting to call asyncable with await, but no asynchronous call has been defined"
)
return asyncio.ensure_future(self.__async(*args, **kwargs))
return self.__sync(*args, **kwargs)

View File

@ -1,8 +1,6 @@
from pathlib import Path from typing import Any, List
from typing import Any, List, Union
import pytest import pytest
from libbot.i18n import BotLocale from libbot.i18n import BotLocale
@ -20,14 +18,12 @@ from libbot.i18n import BotLocale
def test_bot_locale_get( def test_bot_locale_get(
key: str, key: str,
args: List[str], args: List[str],
locale: Union[str, None], locale: str | None,
expected: Any, expected: Any,
bot_locale: BotLocale, bot_locale: BotLocale,
): ):
assert ( assert (
bot_locale._(key, *args, locale=locale) bot_locale._(key, *args, locale=locale) if locale is not None else bot_locale._(key, *args)
if locale is not None
else bot_locale._(key, *args)
) == expected ) == expected
@ -39,9 +35,7 @@ def test_bot_locale_get(
("nested", ["callbacks", "default"], ["sure", "авжеж"]), ("nested", ["callbacks", "default"], ["sure", "авжеж"]),
], ],
) )
def test_i18n_in_all_locales( def test_i18n_in_all_locales(key: str, args: List[str], expected: Any, bot_locale: BotLocale):
key: str, args: List[str], expected: Any, bot_locale: BotLocale
):
assert (bot_locale.in_all_locales(key, *args)) == expected assert (bot_locale.in_all_locales(key, *args)) == expected
@ -53,7 +47,5 @@ def test_i18n_in_all_locales(
("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}), ("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}),
], ],
) )
def test_i18n_in_every_locale( def test_i18n_in_every_locale(key: str, args: List[str], expected: Any, bot_locale: BotLocale):
key: str, args: List[str], expected: Any, bot_locale: BotLocale
):
assert (bot_locale.in_every_locale(key, *args)) == expected assert (bot_locale.in_every_locale(key, *args)) == expected

View File

@ -2,8 +2,7 @@ from pathlib import Path
from typing import Any, List from typing import Any, List
import pytest import pytest
from libbot.utils import config_delete, config_get, config_set
from libbot import config_delete, config_get, config_set
@pytest.mark.asyncio @pytest.mark.asyncio
@ -25,14 +24,9 @@ async def test_config_get_valid(args: List[str], expected: str, location_config:
(["bot_stonks", "bot"], pytest.raises(KeyError)), (["bot_stonks", "bot"], pytest.raises(KeyError)),
], ],
) )
async def test_config_get_invalid( async def test_config_get_invalid(args: List[str], expected: Any, location_config: Path):
args: List[str], expected: Any, location_config: Path
):
with expected: with expected:
assert ( assert await config_get(args[0], *args[1:], config_file=location_config) == expected
await config_get(args[0], *args[1:], config_file=location_config)
== expected
)
@pytest.mark.asyncio @pytest.mark.asyncio
@ -68,7 +62,4 @@ async def test_config_delete(key: str, path: List[str], location_config: Path):
], ],
) )
async def test_config_delete_missing(key: str, path: List[str], location_config: Path): async def test_config_delete_missing(key: str, path: List[str], location_config: Path):
assert ( assert await config_delete(key, *path, missing_ok=True, config_file=location_config) is None
await config_delete(key, *path, missing_ok=True, config_file=location_config)
is None
)

View File

@ -2,8 +2,7 @@ from pathlib import Path
from typing import Any, List from typing import Any, List
import pytest import pytest
from libbot.utils import config_delete, config_get, config_set
from libbot import sync
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -13,8 +12,8 @@ from libbot import sync
(["bot_token", "bot"], "sample_token"), (["bot_token", "bot"], "sample_token"),
], ],
) )
def test_config_get_valid(args: List[str], expected: str, location_config: Path): async def test_config_get_valid(args: List[str], expected: str, location_config: Path):
assert sync.config_get(args[0], *args[1:], config_file=location_config) == expected assert config_get(args[0], *args[1:], config_file=location_config) == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -25,9 +24,7 @@ def test_config_get_valid(args: List[str], expected: str, location_config: Path)
) )
def test_config_get_invalid(args: List[str], expected: Any, location_config: Path): def test_config_get_invalid(args: List[str], expected: Any, location_config: Path):
with expected: with expected:
assert ( assert config_get(args[0], *args[1:], config_file=location_config) == expected
sync.config_get(args[0], *args[1:], config_file=location_config) == expected
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -38,8 +35,8 @@ def test_config_get_invalid(args: List[str], expected: Any, location_config: Pat
], ],
) )
def test_config_set(key: str, path: List[str], value: Any, location_config: Path): def test_config_set(key: str, path: List[str], value: Any, location_config: Path):
sync.config_set(key, value, *path, config_file=location_config) config_set(key, value, *path, config_file=location_config)
assert sync.config_get(key, *path, config_file=location_config) == value assert config_get(key, *path, config_file=location_config) == value
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -49,8 +46,8 @@ def test_config_set(key: str, path: List[str], value: Any, location_config: Path
], ],
) )
def test_config_delete(key: str, path: List[str], location_config: Path): def test_config_delete(key: str, path: List[str], location_config: Path):
sync.config_delete(key, *path, config_file=location_config) config_delete(key, *path, config_file=location_config)
assert key not in sync.config_get(*path, config_file=location_config) assert key not in config_get(*path, config_file=location_config)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -60,7 +57,4 @@ def test_config_delete(key: str, path: List[str], location_config: Path):
], ],
) )
def test_config_delete_missing(key: str, path: List[str], location_config: Path): def test_config_delete_missing(key: str, path: List[str], location_config: Path):
assert ( assert config_delete(key, *path, missing_ok=True, config_file=location_config) is None
sync.config_delete(key, *path, missing_ok=True, config_file=location_config)
is None
)

View File

@ -1,8 +1,7 @@
from pathlib import Path from pathlib import Path
from typing import Any, List, Union from typing import Any, List
import pytest import pytest
from libbot import i18n from libbot import i18n
@ -21,7 +20,7 @@ from libbot import i18n
async def test_i18n_get( async def test_i18n_get(
key: str, key: str,
args: List[str], args: List[str],
locale: Union[str, None], locale: str | None,
expected: Any, expected: Any,
location_locale: Path, location_locale: Path,
): ):
@ -41,12 +40,8 @@ async def test_i18n_get(
("nested", ["callbacks", "default"], ["sure", "авжеж"]), ("nested", ["callbacks", "default"], ["sure", "авжеж"]),
], ],
) )
async def test_i18n_in_all_locales( async def test_i18n_in_all_locales(key: str, args: List[str], expected: Any, location_locale: Path):
key: str, args: List[str], expected: Any, location_locale: Path assert (await i18n.in_all_locales(key, *args, locales_root=location_locale)) == expected
):
assert (
await i18n.in_all_locales(key, *args, locales_root=location_locale)
) == expected
@pytest.mark.asyncio @pytest.mark.asyncio
@ -58,9 +53,5 @@ async def test_i18n_in_all_locales(
("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}), ("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}),
], ],
) )
async def test_i18n_in_every_locale( async def test_i18n_in_every_locale(key: str, args: List[str], expected: Any, location_locale: Path):
key: str, args: List[str], expected: Any, location_locale: Path assert (await i18n.in_every_locale(key, *args, locales_root=location_locale)) == expected
):
assert (
await i18n.in_every_locale(key, *args, locales_root=location_locale)
) == expected

View File

@ -1,9 +1,8 @@
from pathlib import Path from pathlib import Path
from typing import Any, List, Union from typing import Any, List
import pytest import pytest
from libbot.i18n import _, in_all_locales, in_every_locale
from libbot.i18n import sync
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -20,14 +19,14 @@ from libbot.i18n import sync
def test_i18n_get( def test_i18n_get(
key: str, key: str,
args: List[str], args: List[str],
locale: Union[str, None], locale: str | None,
expected: Any, expected: Any,
location_locale: Path, location_locale: Path,
): ):
assert ( assert (
sync._(key, *args, locale=locale, locales_root=location_locale) _(key, *args, locale=locale, locales_root=location_locale)
if locale is not None if locale is not None
else sync._(key, *args, locales_root=location_locale) else _(key, *args, locales_root=location_locale)
) == expected ) == expected
@ -39,10 +38,8 @@ def test_i18n_get(
("nested", ["callbacks", "default"], ["sure", "авжеж"]), ("nested", ["callbacks", "default"], ["sure", "авжеж"]),
], ],
) )
def test_i18n_in_all_locales( def test_i18n_in_all_locales(key: str, args: List[str], expected: Any, location_locale: Path):
key: str, args: List[str], expected: Any, location_locale: Path assert (in_all_locales(key, *args, locales_root=location_locale)) == expected
):
assert (sync.in_all_locales(key, *args, locales_root=location_locale)) == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -53,7 +50,5 @@ def test_i18n_in_all_locales(
("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}), ("nested", ["callbacks", "default"], {"en": "sure", "uk": "авжеж"}),
], ],
) )
def test_i18n_in_every_locale( def test_i18n_in_every_locale(key: str, args: List[str], expected: Any, location_locale: Path):
key: str, args: List[str], expected: Any, location_locale: Path assert (in_every_locale(key, *args, locales_root=location_locale)) == expected
):
assert (sync.in_every_locale(key, *args, locales_root=location_locale)) == expected

View File

@ -4,11 +4,10 @@ except ImportError:
from json import dumps, JSONDecodeError from json import dumps, JSONDecodeError
from pathlib import Path from pathlib import Path
from typing import Any, Union from typing import Any
import pytest import pytest
from libbot.utils import json_read, json_write
from libbot import json_read, json_write
@pytest.mark.asyncio @pytest.mark.asyncio
@ -25,7 +24,7 @@ from libbot import json_read, json_write
("tests/data/empty.json", {}), ("tests/data/empty.json", {}),
], ],
) )
async def test_json_read_valid(path: Union[str, Path], expected: Any): async def test_json_read_valid(path: str | Path, expected: Any):
assert await json_read(path) == expected assert await json_read(path) == expected
@ -37,7 +36,7 @@ async def test_json_read_valid(path: Union[str, Path], expected: Any):
("tests/data/nonexistent.json", FileNotFoundError), ("tests/data/nonexistent.json", FileNotFoundError),
], ],
) )
async def test_json_read_invalid(path: Union[str, Path], expected: Any): async def test_json_read_invalid(path: str | Path, expected: Any):
with pytest.raises(expected): with pytest.raises(expected):
await json_read(path) await json_read(path)
@ -56,7 +55,7 @@ async def test_json_read_invalid(path: Union[str, Path], expected: Any):
({}, "tests/data/empty.json"), ({}, "tests/data/empty.json"),
], ],
) )
async def test_json_write(data: Any, path: Union[str, Path]): async def test_json_write(data: Any, path: str | Path):
await json_write(data, path) await json_write(data, path)
assert Path(path).is_file() assert Path(path).is_file()

View File

@ -4,11 +4,10 @@ except ImportError:
from json import dumps, JSONDecodeError from json import dumps, JSONDecodeError
from pathlib import Path from pathlib import Path
from typing import Any, Union from typing import Any
import pytest import pytest
from libbot.utils import json_read, json_write
from libbot import sync
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -24,8 +23,8 @@ from libbot import sync
("tests/data/empty.json", {}), ("tests/data/empty.json", {}),
], ],
) )
def test_json_read_valid(path: Union[str, Path], expected: Any): def test_json_read_valid(path: str | Path, expected: Any):
assert sync.json_read(path) == expected assert json_read(path) == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -35,9 +34,9 @@ def test_json_read_valid(path: Union[str, Path], expected: Any):
("tests/data/nonexistent.json", FileNotFoundError), ("tests/data/nonexistent.json", FileNotFoundError),
], ],
) )
def test_json_read_invalid(path: Union[str, Path], expected: Any): def test_json_read_invalid(path: str | Path, expected: Any):
with pytest.raises(expected): with pytest.raises(expected):
assert sync.json_read(path) == expected assert json_read(path) == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -53,8 +52,8 @@ def test_json_read_invalid(path: Union[str, Path], expected: Any):
({}, "tests/data/empty.json"), ({}, "tests/data/empty.json"),
], ],
) )
def test_json_write(data: Any, path: Union[str, Path]): def test_json_write(data: Any, path: str | Path):
sync.json_write(data, path) json_write(data, path)
assert Path(path).is_file() assert Path(path).is_file()
with open(path, "r", encoding="utf-8") as f: with open(path, "r", encoding="utf-8") as f:

View File

@ -1,8 +1,7 @@
from typing import Any, Dict, List from typing import Any, Dict, List
import pytest import pytest
from libbot.utils.misc import nested_delete, nested_set
from libbot.sync._nested import nested_delete, nested_set
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -56,9 +55,7 @@ def test_nested_set_invalid(
expected: Any, expected: Any,
): ):
with pytest.raises(expected): with pytest.raises(expected):
assert ( assert (nested_set(target, value, *path, create_missing=create_missing)) == expected
nested_set(target, value, *path, create_missing=create_missing)
) == expected
@pytest.mark.parametrize( @pytest.mark.parametrize(

View File

@ -1,8 +1,7 @@
from typing import Callable from typing import Callable
import pytest import pytest
from libbot.utils.misc import supports_argument
from libbot._utils import supports_argument
def func1(foo: str, bar: str): def func1(foo: str, bar: str):

View File

@ -1,14 +1,13 @@
[tox] [tox]
minversion = 3.9.0 minversion = 3.11.0
envlist = py39, py310, py311, py312 envlist = py311, py312, py313
isolated_build = true isolated_build = true
[gh-actions] [gh-actions]
python = python =
3.9: py39
3.10: py310
3.11: py311 3.11: py311
3.12: py312 3.12: py312
3.13: py313
[testenv] [testenv]
setenv = setenv =