This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/modules/utils.py

264 lines
9.2 KiB
Python
Raw Normal View History

2023-01-13 15:45:23 +02:00
from typing import Any, Literal, Tuple, Union
2023-01-03 14:01:46 +02:00
from uuid import uuid1
2022-12-28 19:56:13 +02:00
from requests import get
2022-12-05 19:49:51 +02:00
from pyrogram.enums.chat_type import ChatType
2022-12-10 12:37:15 +02:00
from pyrogram.types import User
2022-12-10 18:29:06 +02:00
from pyrogram.client import Client
from pyrogram.errors import bad_request_400
2022-10-17 00:30:07 +03:00
from ujson import JSONDecodeError as JSONDecodeError
from ujson import loads, dumps
from sys import exit
2023-01-03 14:01:46 +02:00
from os import kill, listdir, makedirs, path, sep
2022-10-17 00:30:07 +03:00
from os import name as osname
from traceback import print_exc
2022-12-28 19:56:13 +02:00
from classes.errors.geo import PlaceNotFoundError
2022-10-17 00:30:07 +03:00
from modules.logging import logWrite
def jsonLoad(filename):
"""Loads arg1 as json and returns its contents"""
with open(filename, "r", encoding='utf8') as file:
try:
output = loads(file.read())
except JSONDecodeError:
logWrite(f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}")
raise
except FileNotFoundError:
logWrite(f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}")
raise
return output
def jsonSave(contents, filename):
"""Dumps dict/list arg1 to file arg2"""
try:
with open(filename, "w", encoding='utf8') as file:
file.write(dumps(contents, ensure_ascii=False, indent=4))
except Exception as exp:
logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}")
return
def nested_set(dic, keys, value, create_missing=True):
d = dic
for key in keys[:-1]:
if key in d:
d = d[key]
elif create_missing:
d = d.setdefault(key, {})
else:
return dic
if keys[-1] in d or create_missing:
d[keys[-1]] = value
return dic
def configSet(keys: list, value: Any, file: str = "config", create_missing=True):
"""Set config's value to provided one
2022-12-10 12:37:15 +02:00
### Args:
* keys (`list`): List of keys from the highest one to target
* value (`Any`): Needed value
* file (`str`, optional): File (if not config). Defaults to "config".
* create_missing (`bool`, optional): Create missing items on the way. Defaults to True.
2022-10-17 00:30:07 +03:00
"""
2022-10-20 13:24:32 +03:00
if file == "config":
filepath = ""
2022-11-13 14:40:49 +02:00
this_dict = jsonLoad(f"{filepath}{file}.json")
if this_dict["debug"] is True:
2022-10-26 15:54:55 +03:00
try:
this_dict = jsonLoad("config_debug.json")
file = "config_debug"
except FileNotFoundError:
print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True)
2022-10-20 13:24:32 +03:00
else:
filepath = f"data{sep}users{sep}"
2022-11-13 14:40:49 +02:00
this_dict = jsonLoad(f"{filepath}{file}.json")
this_dict = nested_set(this_dict, keys, value, create_missing=create_missing)
2022-10-20 13:24:32 +03:00
jsonSave(this_dict, f"{filepath}{file}.json")
2022-10-17 00:30:07 +03:00
return
2022-10-20 13:24:32 +03:00
def configGet(key: str, *args: str, file: str = "config"):
2022-10-17 00:30:07 +03:00
"""Get value of the config key
2022-12-10 12:37:15 +02:00
### Args:
* key (`str`): The last key of the keys path.
* *args (`str`): Path to key like: dict[args][key].
* file (`str`): User ID to load. Loads config if not provided. Defaults to "config".
### Returns:
2022-10-17 00:30:07 +03:00
* any: Value of provided key
"""
2022-10-20 13:24:32 +03:00
if file == "config":
2022-10-23 13:13:33 +03:00
try:
this_dict = jsonLoad("config.json")
except FileNotFoundError:
2022-10-26 14:30:24 +03:00
print("Config file not found! Copy config_example.json to config.json, configure it and rerun the bot!", flush=True)
2022-10-23 13:13:33 +03:00
exit()
2022-11-13 14:40:49 +02:00
if this_dict["debug"] is True:
2022-10-26 15:54:55 +03:00
try:
this_dict = jsonLoad("config_debug.json")
except FileNotFoundError:
print("Debug mode is set but config_debug.json is not there! Falling back to config.json", flush=True)
2022-10-20 13:24:32 +03:00
else:
this_dict = jsonLoad(f"data{sep}users{sep}{file}.json")
2022-10-17 00:30:07 +03:00
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
return this_key[key]
2022-12-17 01:58:33 +02:00
def locale(key: str, *args: str, locale: Union[str, User] = configGet("locale")) -> Any:
2022-10-20 13:24:32 +03:00
"""Get value of locale string
2022-12-10 12:37:15 +02:00
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: dict[args][key].
2022-12-17 00:25:21 +02:00
* locale (`Union[str, User, HoloUser]`): Locale to looked up in. Provide User to get his `.language_code`. Defaults to config's locale value.
2022-12-10 12:37:15 +02:00
### Returns:
* any: Value of provided locale key. In normal case must be `str`, `dict` or `list`.
2022-10-20 13:24:32 +03:00
"""
2022-12-10 12:37:15 +02:00
if isinstance(locale, User):
locale = locale.language_code
2022-12-17 01:58:33 +02:00
elif hasattr(locale, "locale"):
2022-12-17 00:25:21 +02:00
locale = locale.locale
2022-12-10 12:37:15 +02:00
if locale is None:
2022-10-20 13:24:32 +03:00
locale = configGet("locale")
2022-10-17 00:30:07 +03:00
2022-10-20 13:24:32 +03:00
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json')
except FileNotFoundError:
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json')
except FileNotFoundError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
2022-10-17 00:30:07 +03:00
2022-10-20 13:24:32 +03:00
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
2022-10-17 00:30:07 +03:00
2022-10-20 13:24:32 +03:00
try:
return this_key[key]
except KeyError:
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
2022-10-17 00:30:07 +03:00
2022-12-17 01:58:33 +02:00
def all_locales(key: str, *args: str) -> list:
"""Get value of the provided key and path in all available locales
### Args:
* key (`str`): The last key of the locale's keys path.
* *args (`list`): Path to key like: dict[args][key].
### Returns:
* `list`: List of all values in all locales
"""
output = []
valid_locales = []
files_locales = listdir(f'{configGet("locale", "locations")}')
for entry in files_locales:
valid_locales.append(".".join(entry.split(".")[:-1]))
for lc in valid_locales:
try:
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{lc}.json')
except FileNotFoundError:
continue
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
try:
output.append(this_key[key])
except KeyError:
continue
return output
2022-12-28 19:56:13 +02:00
def find_location(query: str) -> dict:
"""Find location on geonames.org by query. Search is made with feature classes A and P.
### Args:
* query (`str`): Some city/village/state name
### Raises:
* PlaceNotFoundError: Exception is raised when API result is empty
### Returns:
* `dict`: One instance of geonames response
"""
try:
result = (get(f"http://api.geonames.org/searchJSON?q={query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json()
return result["geonames"][0]
except (ValueError, KeyError, IndexError):
raise PlaceNotFoundError(query)
2023-01-03 15:30:16 +02:00
def create_tmp(bytedata: Union[bytes, bytearray], kind: Union[Literal["image", "video"], None] = None) -> str:
2023-01-03 14:16:57 +02:00
"""Create temporary file to help uploading it
### Args:
* bytedata (`Union[bytes, bytearray]`): Some bytes to be written
* kind (`Union[Literal["image", "video"], None]`): Kind of upload. Will add `.jpg` or `.mp4` if needed
### Returns:
* `str`: Path to temporary file
"""
2023-01-03 14:01:46 +02:00
filename = str(uuid1())
if kind == "image":
filename += ".jpg"
elif kind == "video":
filename += ".mp4"
makedirs("tmp", exist_ok=True)
with open(path.join("tmp", filename), "wb") as file:
file.write(bytedata)
return path.join("tmp", filename)
2023-01-13 15:45:23 +02:00
async def download_tmp(app: Client, file_id: str) -> Tuple[str, bytes]:
2023-01-03 14:16:57 +02:00
"""Download file by its ID and return its bytes
### Args:
* app (`Client`): App that will download the file
* file_id (`str`): File's unique id
### Returns:
2023-01-13 15:45:23 +02:00
* `Tuple[str, bytes]`: First is a filepath and the second is file's bytes
2023-01-03 14:16:57 +02:00
"""
2023-01-03 14:01:46 +02:00
filename = str(uuid1())
makedirs("tmp", exist_ok=True)
await app.download_media(file_id, path.join("tmp", filename))
with open(path.join("tmp", filename), "rb") as f:
bytedata = f.read()
2023-01-13 15:45:23 +02:00
return path.join("tmp", filename), bytedata
2023-01-03 14:01:46 +02:00
2022-10-17 00:30:07 +03:00
try:
from psutil import Process
except ModuleNotFoundError:
# print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
print("Missing dependencies! Please install all needed dependencies and run the bot again!")
exit()
def killProc(pid):
if osname == "posix":
from signal import SIGKILL
2022-10-17 00:30:07 +03:00
kill(pid, SIGKILL)
else:
p = Process(pid)
2022-12-05 19:49:51 +02:00
p.kill()
def should_quote(msg):
2022-12-10 18:29:06 +02:00
return True if msg.chat.type is not ChatType.PRIVATE else False
async def find_user(app: Client, query: Union[str, int]):
try:
result = await app.get_users(int(query))
if result == [] or result == None:
raise TypeError
except (TypeError, ValueError):
try:
result = await app.get_users(query)
except bad_request_400.UsernameNotOccupied:
return None
except bad_request_400.UsernameInvalid:
return None
except bad_request_400.PeerIdInvalid:
return None
return result