Custom path for data is now possible

This commit is contained in:
Profitroll 2022-09-15 12:45:29 +02:00
parent 7f3b5174ff
commit 02d945aef5
4 changed files with 88 additions and 22 deletions

19
bot.py
View File

@ -25,10 +25,11 @@ async def start(app, msg):
await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING) await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING)
user_locale = msg.from_user.language_code user_locale = msg.from_user.language_code
data = configGet("data")
if f"{msg.from_user.id}.json" not in listdir(f"data{sep}users{sep}"): if f"{msg.from_user.id}.json" not in listdir(f"{data}{sep}users{sep}"):
logWrite(f'Creating blank data file for {msg.from_user.id}') logWrite(f'Creating blank data file for {msg.from_user.id}')
jsonSave( f"data{sep}users{sep}{msg.from_user.id}.json", {"api_key": None, "linked": False, "context": {"action": None, "data": None}} ) jsonSave( f"{data}{sep}users{sep}{msg.from_user.id}.json", {"api_key": None, "linked": False, "context": {"action": None, "data": None}} )
if not userGet(msg.from_user.id, "linked"): if not userGet(msg.from_user.id, "linked"):
await msg.reply_text(locale("link_input", "msg", locale=user_locale), reply_markup=ForceReply(placeholder=locale("link", "fry", locale=user_locale))) await msg.reply_text(locale("link_input", "msg", locale=user_locale), reply_markup=ForceReply(placeholder=locale("link", "fry", locale=user_locale)))
@ -44,14 +45,15 @@ async def unlink(app, msg):
await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING) await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING)
user_locale = msg.from_user.language_code user_locale = msg.from_user.language_code
data = configGet("data")
if not userGet(msg.from_user.id, "linked"): if not userGet(msg.from_user.id, "linked"):
await msg.reply_text(locale("not_linked", "msg", locale=user_locale)) await msg.reply_text(locale("not_linked", "msg", locale=user_locale))
else: else:
try: try:
keys_storage = jsonLoad(f"data{sep}keys_storage.json") keys_storage = jsonLoad(f"{data}{sep}keys_storage.json")
del keys_storage[userGet(msg.from_user.id, "api_key")] del keys_storage[userGet(msg.from_user.id, "api_key")]
jsonSave(f"data{sep}keys_storage.json", keys_storage) jsonSave(f"{data}{sep}keys_storage.json", keys_storage)
except: except:
pass pass
userClear(msg.from_user.id, "api_key") userClear(msg.from_user.id, "api_key")
@ -87,14 +89,15 @@ async def any_message_handler(app, msg):
if userGet(msg.from_user.id, "context") == "link_key": if userGet(msg.from_user.id, "context") == "link_key":
user_locale = msg.from_user.language_code user_locale = msg.from_user.language_code
data = configGet("data")
if msg.text in jsonLoad(configGet("api_keys"))["autozoom"]: if msg.text in jsonLoad(configGet("api_keys"))["autozoom"]:
await msg.reply_text(locale("key_correct", "msg", locale=user_locale)) await msg.reply_text(locale("key_correct", "msg", locale=user_locale))
userSet(msg.from_user.id, "api_key", msg.text) userSet(msg.from_user.id, "api_key", msg.text)
userSet(msg.from_user.id, "linked", True) userSet(msg.from_user.id, "linked", True)
keys_storage = jsonLoad(f"data{sep}keys_storage.json") keys_storage = jsonLoad(f"{data}{sep}keys_storage.json")
keys_storage[msg.text] = msg.from_user.id keys_storage[msg.text] = msg.from_user.id
jsonSave(f"data{sep}keys_storage.json", keys_storage) jsonSave(f"{data}{sep}keys_storage.json", keys_storage)
logWrite(f"Added apikey {msg.text} for user {msg.from_user.id}") logWrite(f"Added apikey {msg.text} for user {msg.from_user.id}")
else: else:
logWrite(f"User {msg.from_user.id} tried to pair with invalid apikey {msg.text}") logWrite(f"User {msg.from_user.id} tried to pair with invalid apikey {msg.text}")
@ -110,8 +113,10 @@ if __name__ == "__main__":
app.start() app.start()
app.send_message(configGet("admin"), f"Starting bot with pid `{pid}`") app.send_message(configGet("admin"), f"Starting bot with pid `{pid}`")
locales = configGet("locales")
for entry in listdir("locale"): for entry in listdir(locales):
if entry.endswith(".json"): if entry.endswith(".json"):

View File

@ -3,5 +3,8 @@
"api_id": 1234567, "api_id": 1234567,
"api_hash": "01234567890qwertyuiop", "api_hash": "01234567890qwertyuiop",
"bot_token": "12345678:asdfghjklzxcvbnm", "bot_token": "12345678:asdfghjklzxcvbnm",
"api_keys": "/home/user/AutoZoomAPI/data/api_keys.json" "api_keys": "/home/user/AutoZoomAPI/data/api_keys.json",
"data": "data",
"locales": "locale",
"logs": "logs"
} }

View File

@ -10,6 +10,8 @@ from shutil import copyfileobj
from gzip import open as gzipopen from gzip import open as gzipopen
from psutil import Process from psutil import Process
from modules.functions_bot import configGet
def nowtimeGet(format="%H:%M:%S | %d.%m.%Y") -> str: def nowtimeGet(format="%H:%M:%S | %d.%m.%Y") -> str:
"""Return current local time formatted as arg. """Return current local time formatted as arg.
@ -23,7 +25,7 @@ def nowtimeGet(format="%H:%M:%S | %d.%m.%Y") -> str:
return datetime.now().strftime(format) return datetime.now().strftime(format)
def checkSize(logs_folder=f"logs{sep}", log_size=1024) -> None: def checkSize(logs_folder=f"{configGet('logs')}{sep}", log_size=1024) -> None:
"""Checks latest log file size and rotates it if needed. """Checks latest log file size and rotates it if needed.
### Args: ### Args:
@ -54,7 +56,7 @@ def checkSize(logs_folder=f"logs{sep}", log_size=1024) -> None:
i += 1 i += 1
def logWrite(message: str, logs_folder=f"logs{sep}", level="INFO") -> None: def logWrite(message: str, logs_folder=f"{configGet('logs')}{sep}", level="INFO") -> None:
"""Append some message to latest log file. """Append some message to latest log file.
### Args: ### Args:

View File

@ -4,7 +4,8 @@
from types import NoneType from types import NoneType
from typing import Union from typing import Union
from modules.functions import jsonLoad, jsonSave from modules.functions import jsonLoad, jsonSave
from os import sep from os import sep, listdir
def configGet(key: str, *args: str) -> any: def configGet(key: str, *args: str) -> any:
"""Get value of the config key """Get value of the config key
@ -20,6 +21,7 @@ def configGet(key: str, *args: str) -> any:
this_key = this_key[dict_key] this_key = this_key[dict_key]
return this_key[key] return this_key[key]
def configAppend(key: str, value: Union[str, float, int, bool, dict, list, NoneType], *args: str) -> None: def configAppend(key: str, value: Union[str, float, int, bool, dict, list, NoneType], *args: str) -> None:
"""Set key to a value """Set key to a value
### Args: ### Args:
@ -39,6 +41,7 @@ def configAppend(key: str, value: Union[str, float, int, bool, dict, list, NoneT
jsonSave(this_dict, "config.json") jsonSave(this_dict, "config.json")
return return
def configRemove(key: str, value: Union[str, float, int, bool, dict, list, NoneType]) -> None: def configRemove(key: str, value: Union[str, float, int, bool, dict, list, NoneType]) -> None:
"""Remove value from config's list key """Remove value from config's list key
@ -62,12 +65,14 @@ def locale(key: str, *args: str, locale=configGet("locale")) -> str:
""" """
if (locale == None): if (locale == None):
locale = configGet("locale") locale = configGet("locale")
locales = configGet("locales")
try: try:
this_dict = jsonLoad(f'locale{sep}{locale}.json') this_dict = jsonLoad(f'{locales}{sep}{locale}.json')
except FileNotFoundError: except FileNotFoundError:
try: try:
this_dict = jsonLoad(f'locale{sep}{configGet("locale")}.json') this_dict = jsonLoad(f'{locales}{sep}{configGet("locale")}.json')
except FileNotFoundError: except FileNotFoundError:
return f'⚠ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"' return f'⚠ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
@ -88,10 +93,12 @@ def userSet(userid: Union[str, int], key: str, value: Union[str, float, int, boo
* userid (Union[str, int]): ID of a user. * userid (Union[str, int]): ID of a user.
* key (str): Key of a user's variable. * key (str): Key of a user's variable.
* value (Union[str, float, int, bool, dict, list, NoneType]): Some needed value. * value (Union[str, float, int, bool, dict, list, NoneType]): Some needed value.
""" """
user = jsonLoad(f"data{sep}users{sep}{userid}.json") data = configGet("data")
user = jsonLoad(f"{data}{sep}users{sep}{userid}.json")
user[key] = value user[key] = value
jsonSave(f"data{sep}users{sep}{userid}.json", user) jsonSave(f"{data}{sep}users{sep}{userid}.json", user)
def userGet(userid: Union[str, int], key: str) -> any: def userGet(userid: Union[str, int], key: str) -> any:
"""Get user's variable """Get user's variable
@ -102,24 +109,73 @@ def userGet(userid: Union[str, int], key: str) -> any:
### Returns: ### Returns:
* any: Value of requested key or None * any: Value of requested key or None
""" """
data = configGet("data")
try: try:
return jsonLoad(f"data{sep}users{sep}{userid}.json")[key] return jsonLoad(f"{data}{sep}users{sep}{userid}.json")[key]
except KeyError: except KeyError:
return None return None
except FileNotFoundError: except FileNotFoundError:
return None return None
def userClear(userid: Union[str, int], key: str) -> None: def userClear(userid: Union[str, int], key: str) -> None:
"""Clear user's variable """Clear user's variable
### Args: ### Args:
* userid (Union[str, int]): ID of a user. * userid (Union[str, int]): ID of a user.
* key (str): Key of a user's variable. * key (str): Key of a user's variable.
""" """
data = configGet("data")
try: try:
user = jsonLoad(f"data{sep}users{sep}{userid}.json") user = jsonLoad(f"{data}{sep}users{sep}{userid}.json")
del user[key] del user[key]
jsonSave(f"data{sep}users{sep}{userid}.json", user) jsonSave(f"{data}{sep}users{sep}{userid}.json", user)
except KeyError: except KeyError:
pass pass
def localeName(command: str, option: Union[str, NoneType]):
"""Get name of a command or command's option
### Args:
* command (str): Command that is set in locale file
* option (Union[str, NoneType]): Option's name or None (if command's name requested)
### Returns:
* str: Name of a command or an option
"""
output = {}
locales = configGet("locales")
for entry in listdir(locales):
if entry.endswith(".json"):
if entry.replace(".json", "") != configGet("locale"):
all_commands = locale("cmd", locale=entry.replace(".json", ""))
if option != None:
output[entry.replace(".json", "")] = all_commands[command]["options"][option]["name"]
else:
output[entry.replace(".json", "")] = all_commands[command]["name"]
return output
def localeDescription(command: str, option: Union[str, NoneType]):
"""Get description of a command or command's option
### Args:
* command (str): Command that is set in locale file
* option (Union[str, NoneType]): Option's name or None (if command's description requested)
### Returns:
* str: Description of a command or an option
"""
output = {}
locales = configGet("locales")
for entry in listdir(locales):
if entry.endswith(".json"):
if entry.replace(".json", "") != configGet("locale"):
all_commands = locale("cmd", locale=entry.replace(".json", ""))
if option != None:
output[entry.replace(".json", "")] = all_commands[command]["options"][option]["description"]
else:
output[entry.replace(".json", "")] = all_commands[command]["description"]
return output