from os import kill, makedirs from os import name as osname from os import path, sep from sys import exit from traceback import print_exc from typing import Any from zipfile import ZipFile import aiofiles from ujson import JSONDecodeError, dumps, loads from modules.logger import logWrite default_config = { "locale": "en", "locale_log": "en", "locale_fallback": "en", "owner": 0, "admins": [], "bot": {"api_id": 0, "api_hash": "", "bot_token": ""}, "database": { "user": None, "password": None, "host": "127.0.0.1", "port": 27017, "name": "tgposter", }, "mode": {"post": True, "submit": True}, "reports": {"sent": False, "error": True, "startup": True, "shutdown": True}, "logging": {"size": 512, "location": "logs"}, "locations": { "tmp": "tmp", "data": "data", "cache": "cache", "sent": "data/sent", "queue": "data/queue", "index": "data/index.json", "locale": "locale", }, "posting": { "channel": 0, "silent": False, "move_sent": False, "use_interval": False, "interval": "1h30m", "page_size": 300, "submitted_caption": { "enabled": True, "ignore_admins": True, "text": "#submitted", }, "extensions": { "photo": ["jpg", "png", "gif", "jpeg"], "video": ["mp4", "avi", "mkv", "webm", "mov"], }, "time": [ "08:00", "10:00", "12:00", "14:00", "16:00", "18:00", "20:00", "22:00", ], "api": { "address": "http://localhost:8054", "address_external": "https://photos.domain.com", "username": "", "password": "", "album": "", }, }, "caption": {"enabled": False, "link": None, "text": ["sample text"]}, "submission": { "timeout": 30, "file_size": 15728640, "tmp_size": 15728640, "allow_duplicates": False, "send_uploaded_id": False, "require_confirmation": {"users": True, "admins": True}, "mime_types": [ "image/png", "image/gif", "image/jpeg", "video/mp4", "video/quicktime", ], }, "commands": ["start", "rules"], "commands_admin": ["import", "export", "shutdown"], } def jsonLoad(filename: str) -> Any: """Loads arg1 as json and returns its contents""" with open(filename, "r", encoding="utf8") as file: try: output = loads(file.read()) except JSONDecodeError: logWrite( f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}" ) raise except FileNotFoundError: logWrite( f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}" ) raise file.close() return output def jsonSave(contents: Any, filename: str) -> None: """Dumps dict/list arg1 to file arg2""" try: with open(filename, "w", encoding="utf8") as file: file.write( dumps( contents, ensure_ascii=False, indent=4, escape_forward_slashes=False ) ) file.close() except Exception as exp: logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}") return def configSet(key: str, value, *args: str): """Set key to a value Args: * key (str): The last key of the keys path. * value (str/int/float/list/dict/None): Some needed value. * *args (str): Path to key like: dict[args][key]. """ this_dict = jsonLoad("config.json") string = "this_dict" for arg in args: string += f'["{arg}"]' if type(value) in [str]: string += f'["{key}"] = "{value}"' else: string += f'["{key}"] = {value}' exec(string) jsonSave(this_dict, "config.json") return def configGet(key: str, *args: str): """Get value of the config key Args: * key (str): The last key of the keys path. * *args (str): Path to key like: dict[args][key]. Returns: * any: Value of provided key """ this_dict = jsonLoad("config.json") try: this_key = this_dict for dict_key in args: this_key = this_key[dict_key] this_key[key] except KeyError: print( f"Could not find config key '{key}' under path {args}: falling back to default config", flush=True, ) this_key = default_config for dict_key in args: this_key = this_key[dict_key] configSet(key, this_key[key], *args) return this_key[key] def locale(key: str, *args: str, locale=configGet("locale")): """Get value of locale string Args: * key (str): The last key of the locale's keys path. * *args (list): Path to key like: dict[args][key]. * locale (str): Locale to looked up in. Defaults to config's locale value. Returns: * any: Value of provided locale key """ if locale == None: locale = configGet("locale") try: this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json') except FileNotFoundError: try: this_dict = jsonLoad( f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json' ) except FileNotFoundError: try: this_dict = jsonLoad( f'{configGet("locale_fallback", "locations")}{sep}{configGet("locale")}.json' ) except: return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"' this_key = this_dict for dict_key in args: this_key = this_key[dict_key] try: return this_key[key] except KeyError: return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"' async def extract_and_save(handle: ZipFile, filename: str, destpath: str): """Extract and save file from archive Args: * handle (ZipFile): ZipFile handler * filename (str): File base name * path (str): Path where to store """ data = handle.read(filename) filepath = path.join(destpath, filename) try: makedirs(path.dirname(filepath), exist_ok=True) async with aiofiles.open(filepath, "wb") as fd: await fd.write(data) logWrite(f"Unzipped {filename}", debug=True) except IsADirectoryError: makedirs(filepath, exist_ok=True) except FileNotFoundError: pass return try: from psutil import Process except ModuleNotFoundError: print(locale("deps_missing", "console", locale=configGet("locale")), flush=True) exit() def killProc(pid: int) -> None: """Kill process by its PID. Meant to be used to kill the main process of bot itself. ### Args: * pid (`int`): PID of the target """ if osname == "posix": from signal import SIGKILL kill(pid, SIGKILL) else: Process(pid).kill()