This repository has been archived on 2024-08-21. You can view files and clone it, but cannot push or open issues or pull requests.

253 lines
7.2 KiB
Python

from os import kill, makedirs
from os import name as osname
from os import path, sep
from sys import exit
from traceback import print_exc
from typing import Any
from zipfile import ZipFile
import aiofiles
from ujson import JSONDecodeError, dumps, loads
from modules.logger import logWrite
default_config = {
"locale": "en",
"locale_log": "en",
"locale_fallback": "en",
"owner": 0,
"admins": [],
"bot": {"api_id": 0, "api_hash": "", "bot_token": ""},
"database": {
"user": None,
"password": None,
"host": "127.0.0.1",
"port": 27017,
"name": "tgposter",
},
"mode": {"post": True, "submit": True},
"reports": {"sent": False, "error": True, "startup": True, "shutdown": True},
"logging": {"size": 512, "location": "logs"},
"locations": {
"tmp": "tmp",
"data": "data",
"cache": "cache",
"sent": "data/sent",
"queue": "data/queue",
"index": "data/index.json",
"locale": "locale",
},
"posting": {
"channel": 0,
"silent": False,
"move_sent": False,
"use_interval": False,
"interval": "1h30m",
"page_size": 300,
"submitted_caption": {
"enabled": True,
"ignore_admins": True,
"text": "#submitted",
},
"extensions": {
"photo": ["jpg", "png", "gif", "jpeg"],
"video": ["mp4", "avi", "mkv", "webm", "mov"],
},
"time": [
"08:00",
"10:00",
"12:00",
"14:00",
"16:00",
"18:00",
"20:00",
"22:00",
],
"api": {
"address": "http://localhost:8054",
"address_external": "https://photos.domain.com",
"username": "",
"password": "",
"album": "",
},
},
"caption": {"enabled": False, "link": None, "text": ["sample text"]},
"submission": {
"timeout": 30,
"file_size": 15728640,
"tmp_size": 15728640,
"allow_duplicates": False,
"send_uploaded_id": False,
"require_confirmation": {"users": True, "admins": True},
"mime_types": [
"image/png",
"image/gif",
"image/jpeg",
"video/mp4",
"video/quicktime",
],
},
"commands": ["start", "rules"],
"commands_admin": ["import", "export", "shutdown"],
}
def jsonLoad(filename: str) -> Any:
"""Loads arg1 as json and returns its contents"""
with open(filename, "r", encoding="utf8") as file:
try:
output = loads(file.read())
except JSONDecodeError:
logWrite(
f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}"
)
raise
except FileNotFoundError:
logWrite(
f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}"
)
raise
file.close()
return output
def jsonSave(contents: Any, filename: str) -> None:
"""Dumps dict/list arg1 to file arg2"""
try:
with open(filename, "w", encoding="utf8") as file:
file.write(
dumps(
contents, ensure_ascii=False, indent=4, escape_forward_slashes=False
)
)
file.close()
except Exception as exp:
logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}")
return
def configSet(key: str, value, *args: str):
"""Set key to a value
Args:
* key (str): The last key of the keys path.
* value (str/int/float/list/dict/None): Some needed value.
* *args (str): Path to key like: dict[args][key].
"""
this_dict = jsonLoad("config.json")
string = "this_dict"
for arg in args:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
jsonSave(this_dict, "config.json")
return
def configGet(key: str, *args: str):
"""Get value of the config key
Args:
* key (str): The last key of the keys path.
* *args (str): Path to key like: dict[args][key].
Returns:
* any: Value of provided key
"""
this_dict = jsonLoad("config.json")
try:
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
this_key[key]
except KeyError:
print(
f"Could not find config key '{key}' under path {args}: falling back to default config",
flush=True,
)
this_key = default_config
for dict_key in args:
this_key = this_key[dict_key]
configSet(key, this_key[key], *args)
return this_key[key]
def locale(key: str, *args: str, locale=configGet("locale")):
"""Get value of locale string
Args:
* key (str): The last key of the locale's keys path.
* *args (list): Path to key like: dict[args][key].
* locale (str): Locale to looked up in. Defaults to config's locale value.
Returns:
* any: Value of provided locale key
"""
if locale == None:
locale = configGet("locale")
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json')
except FileNotFoundError:
try:
this_dict = jsonLoad(
f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json'
)
except FileNotFoundError:
try:
this_dict = jsonLoad(
f'{configGet("locale_fallback", "locations")}{sep}{configGet("locale")}.json'
)
except:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
async def extract_and_save(handle: ZipFile, filename: str, destpath: str):
"""Extract and save file from archive
Args:
* handle (ZipFile): ZipFile handler
* filename (str): File base name
* path (str): Path where to store
"""
data = handle.read(filename)
filepath = path.join(destpath, filename)
try:
makedirs(path.dirname(filepath), exist_ok=True)
async with aiofiles.open(filepath, "wb") as fd:
await fd.write(data)
logWrite(f"Unzipped {filename}", debug=True)
except IsADirectoryError:
makedirs(filepath, exist_ok=True)
except FileNotFoundError:
pass
return
try:
from psutil import Process
except ModuleNotFoundError:
print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
exit()
def killProc(pid: int) -> None:
"""Kill process by its PID. Meant to be used to kill the main process of bot itself.
### Args:
* pid (`int`): PID of the target
"""
if osname == "posix":
from signal import SIGKILL
kill(pid, SIGKILL)
else:
Process(pid).kill()