from typing import Any, Literal, Tuple, Union from uuid import uuid1 from requests import get from pyrogram.enums.chat_type import ChatType from pyrogram.types import User from pyrogram.client import Client from pyrogram.errors import bad_request_400 from ujson import JSONDecodeError as JSONDecodeError from ujson import loads, dumps from sys import exit from os import kill, listdir, makedirs, path, sep from os import name as osname from traceback import print_exc from classes.errors.geo import PlaceNotFoundError from modules.logging import logWrite def jsonLoad(filename): """Loads arg1 as json and returns its contents""" with open(filename, "r", encoding='utf8') as file: try: output = loads( except JSONDecodeError: logWrite(f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}") raise except FileNotFoundError: logWrite(f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}") raise return output def jsonSave(contents, filename): """Dumps dict/list arg1 to file arg2""" try: with open(filename, "w", encoding='utf8') as file: file.write(dumps(contents, ensure_ascii=False, indent=4)) except Exception as exp: logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}") return def nested_set(dic, keys, value, create_missing=True): d = dic for key in keys[:-1]: if key in d: d = d[key] elif create_missing: d = d.setdefault(key, {}) else: return dic if keys[-1] in d or create_missing: d[keys[-1]] = value return dic def configSet(keys: list, value: Any, file: str = "config", create_missing=True): """Set config's value to provided one ### Args: * keys (`list`): List of keys from the highest one to target * value (`Any`): Needed value * file (`str`, optional): File (if not config). Defaults to "config". * create_missing (`bool`, optional): Create missing items on the way. Defaults to True. """ if file == "config": filepath = "" this_dict = jsonLoad(f"{filepath}{file}.json") if this_dict["debug"] is True: try: this_dict = jsonLoad("config_debug.json") file = "config_debug" except FileNotFoundError: print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True) else: filepath = f"data{sep}users{sep}" this_dict = jsonLoad(f"{filepath}{file}.json") this_dict = nested_set(this_dict, keys, value, create_missing=create_missing) jsonSave(this_dict, f"{filepath}{file}.json") return def configGet(key: str, *args: str, file: str = "config"): """Get value of the config key ### Args: * key (`str`): The last key of the keys path. * *args (`str`): Path to key like: dict[args][key]. * file (`str`): User ID to load. Loads config if not provided. Defaults to "config". ### Returns: * any: Value of provided key """ if file == "config": try: this_dict = jsonLoad("config.json") except FileNotFoundError: print("Config file not found! Copy config_example.json to config.json, configure it and rerun the bot!", flush=True) exit() if this_dict["debug"] is True: try: this_dict = jsonLoad("config_debug.json") except FileNotFoundError: print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True) else: this_dict = jsonLoad(f"data{sep}users{sep}{file}.json") this_key = this_dict for dict_key in args: this_key = this_key[dict_key] return this_key[key] def locale(key: str, *args: str, locale: Union[str, User] = configGet("locale")) -> Any: """Get value of locale string ### Args: * key (`str`): The last key of the locale's keys path. * *args (`list`): Path to key like: dict[args][key]. * locale (`Union[str, User, HoloUser]`): Locale to looked up in. Provide User to get his `.language_code`. Defaults to config's locale value. ### Returns: * any: Value of provided locale key. In normal case must be `str`, `dict` or `list`. """ if isinstance(locale, User): locale = locale.language_code elif hasattr(locale, "locale"): locale = locale.locale if locale is None: locale = configGet("locale") try: this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json') except FileNotFoundError: try: this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json') except FileNotFoundError: return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"' this_key = this_dict for dict_key in args: this_key = this_key[dict_key] try: return this_key[key] except KeyError: return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"' def all_locales(key: str, *args: str) -> list: """Get value of the provided key and path in all available locales ### Args: * key (`str`): The last key of the locale's keys path. * *args (`list`): Path to key like: dict[args][key]. ### Returns: * `list`: List of all values in all locales """ output = [] valid_locales = [] files_locales = listdir(f'{configGet("locale", "locations")}') for entry in files_locales: valid_locales.append(".".join(entry.split(".")[:-1])) for lc in valid_locales: try: this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{lc}.json') except FileNotFoundError: continue this_key = this_dict for dict_key in args: this_key = this_key[dict_key] try: output.append(this_key[key]) except KeyError: continue return output def find_location(query: str) -> dict: """Find location on by query. Search is made with feature classes A and P. ### Args: * query (`str`): Some city/village/state name ### Raises: * PlaceNotFoundError: Exception is raised when API result is empty ### Returns: * `dict`: One instance of geonames response """ try: result = (get(f"{query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json() return result["geonames"][0] except (ValueError, KeyError, IndexError): raise PlaceNotFoundError(query) def create_tmp(bytedata: Union[bytes, bytearray], kind: Union[Literal["image", "video"], None] = None) -> str: """Create temporary file to help uploading it ### Args: * bytedata (`Union[bytes, bytearray]`): Some bytes to be written * kind (`Union[Literal["image", "video"], None]`): Kind of upload. Will add `.jpg` or `.mp4` if needed ### Returns: * `str`: Path to temporary file """ filename = str(uuid1()) if kind == "image": filename += ".jpg" elif kind == "video": filename += ".mp4" makedirs("tmp", exist_ok=True) with open(path.join("tmp", filename), "wb") as file: file.write(bytedata) return path.join("tmp", filename) async def download_tmp(app: Client, file_id: str) -> Tuple[str, bytes]: """Download file by its ID and return its bytes ### Args: * app (`Client`): App that will download the file * file_id (`str`): File's unique id ### Returns: * `Tuple[str, bytes]`: First is a filepath and the second is file's bytes """ filename = str(uuid1()) makedirs("tmp", exist_ok=True) await app.download_media(file_id, path.join("tmp", filename)) with open(path.join("tmp", filename), "rb") as f: bytedata = return path.join("tmp", filename), bytedata try: from psutil import Process except ModuleNotFoundError: # print(locale("deps_missing", "console", locale=configGet("locale")), flush=True) print("Missing dependencies! Please install all needed dependencies and run the bot again!") exit() def killProc(pid): if osname == "posix": from signal import SIGKILL kill(pid, SIGKILL) else: p = Process(pid) p.kill() def should_quote(msg): return True if is not ChatType.PRIVATE else False async def find_user(app: Client, query: Union[str, int]): try: result = await app.get_users(int(query)) if result == [] or result == None: raise TypeError except (TypeError, ValueError): try: result = await app.get_users(query) except bad_request_400.UsernameNotOccupied: return None except bad_request_400.UsernameInvalid: return None except bad_request_400.PeerIdInvalid: return None return result