Telegram/modules/utils.py

264 lines
9.2 KiB
Python

from typing import Any, Literal, Tuple, Union
from uuid import uuid1
from requests import get
from pyrogram.enums.chat_type import ChatType
from pyrogram.types import User
from pyrogram.client import Client
from pyrogram.errors import bad_request_400
from ujson import JSONDecodeError as JSONDecodeError
from ujson import loads, dumps
from sys import exit
from os import kill, listdir, makedirs, path, sep
from os import name as osname
from traceback import print_exc
from classes.errors.geo import PlaceNotFoundError
from modules.logging import logWrite
def jsonLoad(filename):
"""Loads arg1 as json and returns its contents"""
with open(filename, "r", encoding='utf8') as file:
try:
output = loads(file.read())
except JSONDecodeError:
logWrite(f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}")
raise
except FileNotFoundError:
logWrite(f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}")
raise
return output
def jsonSave(contents, filename):
"""Dumps dict/list arg1 to file arg2"""
try:
with open(filename, "w", encoding='utf8') as file:
file.write(dumps(contents, ensure_ascii=False, indent=4))
except Exception as exp:
logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}")
return
def nested_set(dic, keys, value, create_missing=True):
d = dic
for key in keys[:-1]:
if key in d:
d = d[key]
elif create_missing:
d = d.setdefault(key, {})
else:
return dic
if keys[-1] in d or create_missing:
d[keys[-1]] = value
return dic
def configSet(keys: list, value: Any, file: str = "config", create_missing=True):
"""Set config's value to provided one
### Args:
* keys (`list`): List of keys from the highest one to target
* value (`Any`): Needed value
* file (`str`, optional): File (if not config). Defaults to "config".
* create_missing (`bool`, optional): Create missing items on the way. Defaults to True.
"""
if file == "config":
filepath = ""
this_dict = jsonLoad(f"{filepath}{file}.json")
if this_dict["debug"] is True:
try:
this_dict = jsonLoad("config_debug.json")
file = "config_debug"
except FileNotFoundError:
print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True)
else:
filepath = f"data{sep}users{sep}"
this_dict = jsonLoad(f"{filepath}{file}.json")
this_dict = nested_set(this_dict, keys, value, create_missing=create_missing)
jsonSave(this_dict, f"{filepath}{file}.json")
return
def configGet(key: str, *args: str, file: str = "config"):
"""Get value of the config key
### Args:
* key (`str`): The last key of the keys path.
* *args (`str`): Path to key like: dict[args][key].
* file (`str`): User ID to load. Loads config if not provided. Defaults to "config".
### Returns:
* any: Value of provided key
"""
if file == "config":
try:
this_dict = jsonLoad("config.json")
except FileNotFoundError:
print("Config file not found! Copy config_example.json to config.json, configure it and rerun the bot!", flush=True)
exit()
if this_dict["debug"] is True:
try:
this_dict = jsonLoad("config_debug.json")
except FileNotFoundError:
print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True)
else:
this_dict = jsonLoad(f"data{sep}users{sep}{file}.json")
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
return this_key[key]
def locale(key: str, *args: str, locale: Union[str, User] = configGet("locale")) -> Any:
"""Get value of locale string
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: dict[args][key].
* locale (`Union[str, User, HoloUser]`): Locale to looked up in. Provide User to get his `.language_code`. Defaults to config's locale value.
### Returns:
* any: Value of provided locale key. In normal case must be `str`, `dict` or `list`.
"""
if isinstance(locale, User):
locale = locale.language_code
elif hasattr(locale, "locale"):
locale = locale.locale
if locale is None:
locale = configGet("locale")
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json')
except FileNotFoundError:
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json')
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
def all_locales(key: str, *args: str) -> list:
"""Get value of the provided key and path in all available locales
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: dict[args][key].
### Returns:
* `list`: List of all values in all locales
"""
output = []
valid_locales = []
files_locales = listdir(f'{configGet("locale", "locations")}')
for entry in files_locales:
valid_locales.append(".".join(entry.split(".")[:-1]))
for lc in valid_locales:
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{lc}.json')
except FileNotFoundError:
continue
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output.append(this_key[key])
except KeyError:
continue
return output
def find_location(query: str) -> dict:
"""Find location on geonames.org by query. Search is made with feature classes A and P.
### Args:
* query (`str`): Some city/village/state name
### Raises:
* PlaceNotFoundError: Exception is raised when API result is empty
### Returns:
* `dict`: One instance of geonames response
"""
try:
result = (get(f"http://api.geonames.org/searchJSON?q={query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json()
return result["geonames"][0]
except (ValueError, KeyError, IndexError):
raise PlaceNotFoundError(query)
def create_tmp(bytedata: Union[bytes, bytearray], kind: Union[Literal["image", "video"], None] = None) -> str:
"""Create temporary file to help uploading it
### Args:
* bytedata (`Union[bytes, bytearray]`): Some bytes to be written
* kind (`Union[Literal["image", "video"], None]`): Kind of upload. Will add `.jpg` or `.mp4` if needed
### Returns:
* `str`: Path to temporary file
"""
filename = str(uuid1())
if kind == "image":
filename += ".jpg"
elif kind == "video":
filename += ".mp4"
makedirs("tmp", exist_ok=True)
with open(path.join("tmp", filename), "wb") as file:
file.write(bytedata)
return path.join("tmp", filename)
async def download_tmp(app: Client, file_id: str) -> Tuple[str, bytes]:
"""Download file by its ID and return its bytes
### Args:
* app (`Client`): App that will download the file
* file_id (`str`): File's unique id
### Returns:
* `Tuple[str, bytes]`: First is a filepath and the second is file's bytes
"""
filename = str(uuid1())
makedirs("tmp", exist_ok=True)
await app.download_media(file_id, path.join("tmp", filename))
with open(path.join("tmp", filename), "rb") as f:
bytedata = f.read()
return path.join("tmp", filename), bytedata
try:
from psutil import Process
except ModuleNotFoundError:
# print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
print("Missing dependencies! Please install all needed dependencies and run the bot again!")
exit()
def killProc(pid):
if osname == "posix":
from signal import SIGKILL
kill(pid, SIGKILL)
else:
p = Process(pid)
p.kill()
def should_quote(msg):
return True if msg.chat.type is not ChatType.PRIVATE else False
async def find_user(app: Client, query: Union[str, int]):
try:
result = await app.get_users(int(query))
if result == [] or result == None:
raise TypeError
except (TypeError, ValueError):
try:
result = await app.get_users(query)
except bad_request_400.UsernameNotOccupied:
return None
except bad_request_400.UsernameInvalid:
return None
except bad_request_400.PeerIdInvalid:
return None
return result