This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/modules/utils.py
2022-12-10 17:29:06 +01:00

169 lines
6.0 KiB
Python

from typing import Any, Union
from pyrogram.enums.chat_type import ChatType
from pyrogram.types import User
from pyrogram.client import Client
from pyrogram.errors import bad_request_400
from ujson import JSONDecodeError as JSONDecodeError
from ujson import loads, dumps
from sys import exit
from os import kill, sep
from os import name as osname
from traceback import print_exc
from modules.logging import logWrite
def jsonLoad(filename):
"""Loads arg1 as json and returns its contents"""
with open(filename, "r", encoding='utf8') as file:
try:
output = loads(file.read())
except JSONDecodeError:
logWrite(f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}")
raise
except FileNotFoundError:
logWrite(f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}")
raise
return output
def jsonSave(contents, filename):
"""Dumps dict/list arg1 to file arg2"""
try:
with open(filename, "w", encoding='utf8') as file:
file.write(dumps(contents, ensure_ascii=False, indent=4))
except Exception as exp:
logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}")
return
def nested_set(dic, keys, value, create_missing=True):
d = dic
for key in keys[:-1]:
if key in d:
d = d[key]
elif create_missing:
d = d.setdefault(key, {})
else:
return dic
if keys[-1] in d or create_missing:
d[keys[-1]] = value
return dic
def configSet(keys: list, value: Any, file: str = "config", create_missing=True):
"""Set config's value to provided one
### Args:
* keys (`list`): List of keys from the highest one to target
* value (`Any`): Needed value
* file (`str`, optional): File (if not config). Defaults to "config".
* create_missing (`bool`, optional): Create missing items on the way. Defaults to True.
"""
if file == "config":
filepath = ""
this_dict = jsonLoad(f"{filepath}{file}.json")
if this_dict["debug"] is True:
try:
this_dict = jsonLoad("config_debug.json")
file = "config_debug"
except FileNotFoundError:
print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True)
else:
filepath = f"data{sep}users{sep}"
this_dict = jsonLoad(f"{filepath}{file}.json")
this_dict = nested_set(this_dict, keys, value, create_missing=create_missing)
jsonSave(this_dict, f"{filepath}{file}.json")
return
def configGet(key: str, *args: str, file: str = "config"):
"""Get value of the config key
### Args:
* key (`str`): The last key of the keys path.
* *args (`str`): Path to key like: dict[args][key].
* file (`str`): User ID to load. Loads config if not provided. Defaults to "config".
### Returns:
* any: Value of provided key
"""
if file == "config":
try:
this_dict = jsonLoad("config.json")
except FileNotFoundError:
print("Config file not found! Copy config_example.json to config.json, configure it and rerun the bot!", flush=True)
exit()
if this_dict["debug"] is True:
try:
this_dict = jsonLoad("config_debug.json")
except FileNotFoundError:
print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True)
else:
this_dict = jsonLoad(f"data{sep}users{sep}{file}.json")
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
return this_key[key]
def locale(key: str, *args: str, locale: Union[str, User] = configGet("locale")) -> Any:
"""Get value of locale string
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: dict[args][key].
* locale (`Union[str, User]`): Locale to looked up in. Provide User to get his `.language_code`. Defaults to config's locale value.
### Returns:
* any: Value of provided locale key. In normal case must be `str`, `dict` or `list`.
"""
if isinstance(locale, User):
locale = locale.language_code
if locale is None:
locale = configGet("locale")
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json')
except FileNotFoundError:
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json')
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
try:
from psutil import Process
except ModuleNotFoundError:
# print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
print("Missing dependencies! Please install all needed dependencies and run the bot again!")
exit()
def killProc(pid):
if osname == "posix":
from signal import SIGKILL
kill(pid, SIGKILL)
else:
p = Process(pid)
p.kill()
def should_quote(msg):
return True if msg.chat.type is not ChatType.PRIVATE else False
async def find_user(app: Client, query: Union[str, int]):
try:
result = await app.get_users(int(query))
if result == [] or result == None:
raise TypeError
except (TypeError, ValueError):
try:
result = await app.get_users(query)
except bad_request_400.UsernameNotOccupied:
return None
except bad_request_400.UsernameInvalid:
return None
except bad_request_400.PeerIdInvalid:
return None
return result