WIP: Overhaul for 4.0.0

This commit is contained in:
kku
2024-12-18 14:16:37 +01:00
parent 5e479ddc79
commit 95d04308bd
32 changed files with 718 additions and 688 deletions

View File

@@ -0,0 +1,3 @@
from . import misc
from .config import config_get, config_set, config_delete
from .json import json_read, json_write

View File

@@ -0,0 +1 @@
from ._functions import config_get, config_set, config_delete

View File

@@ -0,0 +1,185 @@
from pathlib import Path
from typing import Any, Union, Dict
from ..json import json_read, json_write
from ..misc import nested_delete, nested_set
from ..syncs import asyncable
try:
from ujson import dumps, loads
except ImportError:
from json import dumps, loads
DEFAULT_CONFIG_LOCATION: str = "config.json"
@asyncable
def config_get(key: str, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION) -> Any:
"""Get a value of the config key by its path provided
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
### Args:
* key (`str`): Key that contains the value
* *path (`str`): Path to the key that contains the value
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Returns:
* `Any`: Key's value
### Example:
Get the "salary" of "Pete" from this JSON structure:
```json
{
"users": {
"Pete": {
"salary": 10.0
}
}
}
```
This can be easily done with the following code:
```python
import libbot
salary = libbot.sync.config_get("salary", "users", "Pete")
```
"""
this_key: Dict[str, Any] = json_read(config_file)
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
@config_get.asynchronous
async def config_get(key: str, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION) -> Any:
"""Get a value of the config key by its path provided
For example, `foo.bar.key` has a path of `"foo", "bar"` and the key `"key"`
### Args:
* key (`str`): Key that contains the value
* *path (`str`): Path to the key that contains the value
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Returns:
* `Any`: Key's value
### Example:
Get the "salary" of "Pete" from this JSON structure:
```json
{
"users": {
"Pete": {
"salary": 10.0
}
}
}
```
This can be easily done with the following code:
```python
import libbot
salary = await libbot.config_get("salary", "users", "Pete")
```
"""
this_key: Dict[str, Any] = await json_read(config_file)
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
@asyncable
def config_set(
key: str, value: Any, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION
) -> None:
"""Set config's key by its path to the value
### Args:
* key (`str`): Key that leads to the value
* value (`Any`): Any JSON serializable data
* *path (`str`): Path to the key of the target
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided
"""
json_write(nested_set(json_read(config_file), value, *(*path, key)), config_file)
@config_set.asynchronous
async def config_set(
key: str, value: Any, *path: str, config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION
) -> None:
"""Set config's key by its path to the value
### Args:
* key (`str`): Key that leads to the value
* value (`Any`): Any JSON serializable data
* *path (`str`): Path to the key of the target
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided
"""
await json_write(nested_set(await json_read(config_file), value, *(*path, key)), config_file)
@asyncable
def config_delete(
key: str,
*path: str,
missing_ok: bool = False,
config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION,
) -> None:
"""Set config's key by its path
### Args:
* key (`str`): Key to delete
* *path (`str`): Path to the key of the target
* missing_ok (`bool`): Do not raise an exception if the key is missing. Defaults to `False`
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided and `missing_ok` is `False`
"""
config_data: Dict[str, Any] = json_read(config_file)
try:
nested_delete(config_data, *(*path, key))
except KeyError as exc:
if not missing_ok:
raise exc from exc
json_write(config_data, config_file)
@config_delete.asynchronous
async def config_delete(
key: str,
*path: str,
missing_ok: bool = False,
config_file: Union[str, Path] = DEFAULT_CONFIG_LOCATION,
) -> None:
"""Set config's key by its path
### Args:
* key (`str`): Key to delete
* *path (`str`): Path to the key of the target
* missing_ok (`bool`): Do not raise an exception if the key is missing. Defaults to `False`
* config_file (`Union[str, Path]`, *optional*): Path-like object or path as a string of a location of the config file. Defaults to `"config.json"`
### Raises:
* `KeyError`: Key is not found under path provided and `missing_ok` is `False`
"""
config_data: Dict[str, Any] = await json_read(config_file)
try:
nested_delete(config_data, *(*path, key))
except KeyError as exc:
if not missing_ok:
raise exc from exc
await json_write(config_data, config_file)

View File

@@ -0,0 +1 @@
from ._functions import json_read, json_write

View File

@@ -0,0 +1,76 @@
from pathlib import Path
from typing import Any, Union
import aiofiles
from ..misc import supports_argument
from ..syncs import asyncable
try:
from ujson import dumps, loads
except ImportError:
from json import dumps, loads
@asyncable
def json_read(path: Union[str, Path]) -> Any:
"""Read contents of a JSON file
### Args:
* path (`Union[str, Path]`): Path-like object or path as a string
### Returns:
* `Any`: File contents
"""
with open(str(path), mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
@json_read.asynchronous
async def json_read(path: Union[str, Path]) -> Any:
"""Read contents of a JSON file
### Args:
* path (`Union[str, Path]`): Path-like object or path as a string
### Returns:
* `Any`: File contents
"""
async with aiofiles.open(str(path), mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
@asyncable
def json_write(data: Any, path: Union[str, Path]) -> None:
"""Write contents to a JSON file
### Args:
* data (`Any`): Contents to write. Must be a JSON serializable
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
"""
with open(str(path), mode="w", encoding="utf-8") as f:
f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
if supports_argument(dumps, "escape_forward_slashes")
else dumps(data, ensure_ascii=False, indent=4)
)
@json_write.asynchronous
async def json_write(data: Any, path: Union[str, Path]) -> None:
"""Write contents to a JSON file
### Args:
* data (`Any`): Contents to write. Must be a JSON serializable
* path (`Union[str, Path]`): Path-like object or path as a string of a destination
"""
async with aiofiles.open(str(path), mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
if supports_argument(dumps, "escape_forward_slashes")
else dumps(data, ensure_ascii=False, indent=4)
)

View File

@@ -0,0 +1 @@
from ._functions import supports_argument, nested_set, nested_delete

View File

@@ -0,0 +1,85 @@
import inspect
from typing import Any, Dict
from typing import Callable
def supports_argument(func: Callable, arg_name: str) -> bool:
"""Check whether a function has a specific argument
### Args:
* func (`Callable`): Function to be inspected
* arg_name (`str`): Argument to be checked
### Returns:
* `bool`: `True` if argument is supported and `False` if not
"""
if hasattr(func, "__code__"):
return arg_name in inspect.signature(func).parameters
if hasattr(func, "__doc__"):
if doc := func.__doc__:
first_line = doc.splitlines()[0]
return arg_name in first_line
return False
def nested_set(target: dict, value: Any, *path: str, create_missing=True) -> Dict[str, Any]:
"""Set the key by its path to the value
### Args:
* target (`dict`): Dictionary to perform modifications on
* value (`Any`): Any data
* *path (`str`): Path to the key of the target
* create_missing (`bool`, *optional*): Create keys on the way if they're missing. Defaults to `True`
### Raises:
* `KeyError`: Key is not found under path provided
### Returns:
* `Dict[str, Any]`: Changed dictionary
"""
d = target
for key in path[:-1]:
if key in d:
d = d[key]
elif create_missing:
d = d.setdefault(key, {})
else:
raise KeyError(
f"Key '{key}' is not found under path provided ({path}) and create_missing is False"
)
if path[-1] in d or create_missing:
d[path[-1]] = value
return target
def nested_delete(target: dict, *path: str) -> Dict[str, Any]:
"""Delete the key by its path
### Args:
* target (`dict`): Dictionary to perform modifications on
### Raises:
* `KeyError`: Key is not found under path provided
### Returns:
`Dict[str, Any]`: Changed dictionary
"""
d = target
for key in path[:-1]:
if key in d:
d = d[key]
else:
raise KeyError(f"Key '{key}' is not found under path provided ({path})")
if path[-1] in d:
del d[path[-1]]
else:
raise KeyError(f"Key '{path[-1]}' is not found under path provided ({path})")
return target

View File

@@ -0,0 +1 @@
from ._syncs import asyncable

View File

@@ -0,0 +1,69 @@
import asyncio
import inspect
from inspect import FrameInfo
from typing import Any, Callable, Optional, Type
class asyncable:
"""Allows to mark a callable able to be async.
Source: https://itsjohannawren.medium.com/single-call-sync-and-async-in-python-2acadd07c9d6"""
def __init__(self, method: Callable):
self.__sync = method
self.__async = None
def asynchronous(self, method: Callable) -> "asyncable":
if not isinstance(method, Callable):
raise RuntimeError("NOT CALLABLE!!!")
self.__async = method
return self
@staticmethod
def __is_awaited() -> bool:
frame: FrameInfo = inspect.stack()[2]
if not hasattr(frame, "positions"):
return False
return (
frame.positions.col_offset >= 6
and frame.code_context[frame.index][frame.positions.col_offset - 6 : frame.positions.col_offset]
== "await "
)
def __get__(
self,
instance: Type,
*args,
owner_class: Optional[Type[Type]] = None,
**kwargs,
) -> Callable:
if self.__is_awaited():
if self.__async is None:
raise RuntimeError(
"Attempting to call asyncable with await, but no asynchronous call has been defined"
)
bound_method = self.__async.__get__(instance, owner_class)
if isinstance(self.__sync, classmethod):
return lambda: asyncio.ensure_future(bound_method(owner_class, *args, **kwargs))
return lambda: asyncio.ensure_future(bound_method(*args, **kwargs))
bound_method = self.__sync.__get__(instance, owner_class)
return lambda: bound_method(*args, **kwargs)
def __call__(self, *args, **kwargs) -> Any:
if self.__is_awaited():
if self.__async is None:
raise RuntimeError(
"Attempting to call asyncable with await, but no asynchronous call has been defined"
)
return asyncio.ensure_future(self.__async(*args, **kwargs))
return self.__sync(*args, **kwargs)