dev (#19)
* Config file changes * Commands `/remove`, `/import` and `/export` Co-authored-by: profitroll <vozhd.kk@gmail.com> Co-authored-by: Profitroll <47523801+profitrollgame@users.noreply.github.com> Co-authored-by: Renovate <renovate@git.end-play.xyz> Reviewed-on: #19
This commit is contained in:
@@ -1,77 +1,263 @@
|
||||
"""This is only a temporary solution. Complete Photos API client is yet to be developed."""
|
||||
|
||||
import asyncio
|
||||
from base64 import b64decode, b64encode
|
||||
from os import makedirs, path, sep
|
||||
from random import choice
|
||||
from traceback import print_exc
|
||||
from typing import Tuple, Union
|
||||
|
||||
from requests import get, patch, post
|
||||
import aiofiles
|
||||
from aiohttp import FormData
|
||||
|
||||
from classes.exceptions import (
|
||||
AlbumCreationDuplicateError,
|
||||
AlbumCreationError,
|
||||
AlbumCreationNameError,
|
||||
SubmissionUploadError,
|
||||
UserCreationDuplicateError,
|
||||
UserCreationError,
|
||||
)
|
||||
from modules.logger import logWrite
|
||||
from modules.utils import configGet
|
||||
from modules.utils import configGet, locale
|
||||
from modules.http_client import http_session
|
||||
|
||||
|
||||
async def authorize() -> str:
|
||||
makedirs(configGet("cache", "locations"), exist_ok=True)
|
||||
if path.exists(configGet("cache", "locations")+sep+"api_access") is True:
|
||||
with open(configGet("cache", "locations")+sep+"api_access", "rb") as file:
|
||||
token = b64decode(file.read()).decode("utf-8")
|
||||
if get(configGet("address", "posting", "api")+"/users/me/", headers={"Authorization": f"Bearer {token}"}).status_code == 200:
|
||||
if path.exists(configGet("cache", "locations") + sep + "api_access") is True:
|
||||
async with aiofiles.open(
|
||||
configGet("cache", "locations") + sep + "api_access", "rb"
|
||||
) as file:
|
||||
token = b64decode(await file.read()).decode("utf-8")
|
||||
if (
|
||||
await http_session.get(
|
||||
configGet("address", "posting", "api") + "/users/me/",
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
).status == 200:
|
||||
return token
|
||||
payload = {
|
||||
"grant_type": "password",
|
||||
"scope": "me albums.list albums.read albums.write photos.list photos.read photos.write videos.list videos.read videos.write",
|
||||
"username": configGet("username", "posting", "api"),
|
||||
"password": configGet("password", "posting", "api")
|
||||
"password": configGet("password", "posting", "api"),
|
||||
}
|
||||
response = post(configGet("address", "posting", "api")+"/token", data=payload)
|
||||
if response.status_code != 200:
|
||||
logWrite(f'Incorrect API credentials! Could not login into "{configGet("address", "posting", "api")}" using login "{configGet("username", "posting", "api")}": HTTP {response.status_code}')
|
||||
response = await http_session.post(
|
||||
configGet("address", "posting", "api") + "/token", data=payload
|
||||
)
|
||||
if not response.ok:
|
||||
logWrite(
|
||||
locale(
|
||||
"api_creds_invalid",
|
||||
"console",
|
||||
locale=configGet("locale_log").format(
|
||||
configGet("address", "posting", "api"),
|
||||
configGet("username", "posting", "api"),
|
||||
response.status,
|
||||
),
|
||||
)
|
||||
)
|
||||
raise ValueError
|
||||
with open(configGet("cache", "locations")+sep+"api_access", "wb") as file:
|
||||
file.write(b64encode(response.json()["access_token"].encode("utf-8")))
|
||||
return response.json()["access_token"]
|
||||
async with aiofiles.open(
|
||||
configGet("cache", "locations") + sep + "api_access", "wb"
|
||||
) as file:
|
||||
await file.write(
|
||||
b64encode((await response.json())["access_token"].encode("utf-8"))
|
||||
)
|
||||
return (await response.json())["access_token"]
|
||||
|
||||
|
||||
async def random_pic(token: Union[str, None] = None) -> Tuple[str, str]:
|
||||
"""Returns random image id and filename from the queue.
|
||||
|
||||
### Returns:
|
||||
* `Tuple[str, str]`: First value is an ID and the filename in the filesystem to be indexed.
|
||||
"""
|
||||
if token is None:
|
||||
token = await authorize()
|
||||
logWrite(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size=100&caption=queue')
|
||||
resp = get(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size=100&caption=queue', headers={"Authorization": f"Bearer {token}"})
|
||||
if resp.status_code != 200:
|
||||
logWrite(f'Could not get photos from album {configGet("album", "posting", "api")}: HTTP {resp.status_code}')
|
||||
logWrite(f'Could not get photos from "{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size=100&caption=queue" using token "{token}": HTTP {resp.status_code}', debug=True)
|
||||
"""
|
||||
token = await authorize() if token is None else token
|
||||
logWrite(
|
||||
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue'
|
||||
)
|
||||
resp = await http_session.get(
|
||||
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos?q=&page_size={configGet("page_size", "posting")}&caption=queue',
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
logWrite(
|
||||
locale("random_pic_response", "console", locale=configGet("locale_log")).format(
|
||||
await resp.json()
|
||||
),
|
||||
debug=True,
|
||||
)
|
||||
if resp.status != 200:
|
||||
logWrite(
|
||||
locale(
|
||||
"random_pic_error_code",
|
||||
"console",
|
||||
locale=configGet("locale_log").format(
|
||||
configGet("album", "posting", "api"), resp.status
|
||||
),
|
||||
),
|
||||
)
|
||||
logWrite(
|
||||
locale(
|
||||
"random_pic_error_debug",
|
||||
"console",
|
||||
locale=configGet("locale_log").format(
|
||||
configGet("address", "posting", "api"),
|
||||
configGet("album", "posting", "api"),
|
||||
configGet("page_size", "posting"),
|
||||
token,
|
||||
resp.status,
|
||||
),
|
||||
),
|
||||
debug=True,
|
||||
)
|
||||
raise ValueError
|
||||
if len(resp.json()["results"]) == 0:
|
||||
if len((await resp.json())["results"]) == 0:
|
||||
raise KeyError
|
||||
pic = choice(resp.json()["results"])
|
||||
pic = choice((await resp.json())["results"])
|
||||
return pic["id"], pic["filename"]
|
||||
|
||||
async def upload_pic(filepath: str) -> Tuple[bool, list]:
|
||||
token = await authorize()
|
||||
|
||||
async def upload_pic(
|
||||
filepath: str, allow_duplicates: bool = False, token: Union[str, None] = None
|
||||
) -> Tuple[bool, list, Union[str, None]]:
|
||||
token = await authorize() if token is None else token
|
||||
try:
|
||||
pic_name = path.basename(filepath)
|
||||
files = {'file': (pic_name, open(filepath, 'rb'), 'image/jpeg')}
|
||||
response = post(f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos&caption=queue', headers={"Authorization": f"Bearer {token}"}, files=files).json()
|
||||
print(response, flush=True)
|
||||
logWrite(f"Uploading {pic_name} to the API...", debug=True)
|
||||
async with aiofiles.open(filepath, "rb") as f:
|
||||
file_bytes = await f.read()
|
||||
formdata = FormData()
|
||||
formdata.add_field(
|
||||
"file", file_bytes, filename=pic_name, content_type="image/jpeg"
|
||||
)
|
||||
response = await http_session.post(
|
||||
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
|
||||
params={
|
||||
"caption": "queue",
|
||||
"compress": "false",
|
||||
"ignore_duplicates": str(allow_duplicates).lower(),
|
||||
},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
data=formdata,
|
||||
)
|
||||
response_json = await response.json()
|
||||
if response.status != 200 and response.status != 409:
|
||||
logWrite(
|
||||
locale(
|
||||
"pic_upload_error",
|
||||
"console",
|
||||
locale=configGet("locale_log").format(
|
||||
filepath, response.status, response.content
|
||||
),
|
||||
),
|
||||
)
|
||||
raise SubmissionUploadError(
|
||||
str(filepath), response.status, response.content
|
||||
)
|
||||
id = response_json["id"] if "id" in await response.json() else None
|
||||
duplicates = []
|
||||
if "duplicates" in response:
|
||||
for duplicate in response["duplicates"]:
|
||||
duplicates.append(f'{configGet("address", "posting", "api")}/photos/{duplicate["id"]}')
|
||||
return True, duplicates
|
||||
except:
|
||||
return False, []
|
||||
if "duplicates" in response_json:
|
||||
for index, duplicate in enumerate(response_json["duplicates"]): # type: ignore
|
||||
if response_json["access_token"] is None:
|
||||
duplicates.append(
|
||||
f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/photos/{duplicate["id"]}'
|
||||
)
|
||||
else:
|
||||
duplicates.append(
|
||||
f'`{duplicate["id"]}`:\n{configGet("address_external", "posting", "api")}/token/photo/{response_json["access_token"]}?id={index}'
|
||||
)
|
||||
return True, duplicates, id
|
||||
except Exception as exp:
|
||||
print_exc()
|
||||
return False, [], None
|
||||
|
||||
async def move_pic(id: str) -> bool:
|
||||
token = await authorize()
|
||||
|
||||
async def find_pic(
|
||||
name: str, caption: Union[str, None] = None, token: Union[str, None] = None
|
||||
) -> Union[dict, None]:
|
||||
token = await authorize() if token is None else token
|
||||
try:
|
||||
patch(f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent', headers={"Authorization": f"Bearer {token}"})
|
||||
response = await http_session.get(
|
||||
f'{configGet("address", "posting", "api")}/albums/{configGet("album", "posting", "api")}/photos',
|
||||
params={"q": name, "caption": caption},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
# logWrite(response.json())
|
||||
if response.status != 200:
|
||||
return None
|
||||
if len((await response.json())["results"]) == 0:
|
||||
return None
|
||||
return (await response.json())["results"]
|
||||
except Exception as exp:
|
||||
logWrite(
|
||||
locale(
|
||||
"find_pic_error",
|
||||
"console",
|
||||
locale=configGet("locale_log").format(name, caption, exp),
|
||||
),
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
async def move_pic(id: str, token: Union[str, None] = None) -> bool:
|
||||
token = await authorize() if token is None else token
|
||||
try:
|
||||
response = await http_session.patch(
|
||||
f'{configGet("address", "posting", "api")}/photos/{id}?caption=sent',
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
if response.status != 200:
|
||||
logWrite(f"Media moving failed with HTTP {response.status}", debug=True)
|
||||
return False
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
async def remove_pic(id: str, token: Union[str, None] = None) -> bool:
|
||||
token = await authorize() if token is None else token
|
||||
try:
|
||||
response = await http_session.delete(
|
||||
f'{configGet("address", "posting", "api")}/photos/{id}',
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
if response.status != 204:
|
||||
logWrite(f"Media removal failed with HTTP {response.status}", debug=True)
|
||||
return False
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
|
||||
async def create_user(username: str, email: str, password: str) -> None:
|
||||
response = await http_session.post(
|
||||
f'{configGet("address", "posting", "api")}/users',
|
||||
data={"user": username, "email": email, "password": password},
|
||||
)
|
||||
if response.status == 409:
|
||||
raise UserCreationDuplicateError(username)
|
||||
elif response.status != 204:
|
||||
raise UserCreationError(response.status, await response.text(encoding="utf-8"))
|
||||
return None
|
||||
|
||||
|
||||
async def create_album(name: str, title: str) -> None:
|
||||
token = await authorize()
|
||||
response = await http_session.post(
|
||||
f'{configGet("address", "posting", "api")}/albums',
|
||||
params={"name": name, "title": title},
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
if response.status == 409:
|
||||
raise AlbumCreationDuplicateError(name)
|
||||
elif response.status == 406:
|
||||
raise AlbumCreationNameError(await response.json())
|
||||
elif response.status != 200:
|
||||
raise AlbumCreationError(response.status, await response.text(encoding="utf-8"))
|
||||
return None
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(asyncio.run(authorize()))
|
||||
print(asyncio.run(authorize()))
|
||||
|
@@ -1,4 +1,14 @@
|
||||
from pyrogram.client import Client
|
||||
from modules.utils import configGet
|
||||
from classes.poster_client import PosterClient
|
||||
from convopyro import Conversation
|
||||
|
||||
app = Client("duptsiaposter", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot"))
|
||||
app = PosterClient(
|
||||
"duptsiaposter",
|
||||
bot_token=configGet("bot_token", "bot"),
|
||||
api_id=configGet("api_id", "bot"),
|
||||
api_hash=configGet("api_hash", "bot"),
|
||||
)
|
||||
|
||||
Conversation(app)
|
||||
|
||||
users_with_context = []
|
||||
|
78
modules/cli.py
Normal file
78
modules/cli.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import asyncio
|
||||
from sys import exit
|
||||
from traceback import print_exc
|
||||
from modules.api_client import create_album, create_user, http_session
|
||||
from argparse import ArgumentParser
|
||||
|
||||
from modules.utils import configSet
|
||||
|
||||
parser = ArgumentParser(
|
||||
prog="Telegram Poster",
|
||||
description="Bot for posting some of your stuff and also receiving submissions.",
|
||||
)
|
||||
|
||||
parser.add_argument("--create-user", action="store_true")
|
||||
parser.add_argument("--create-album", action="store_true")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
|
||||
async def cli_create_user() -> None:
|
||||
print(
|
||||
"To set up Photos API connection you need to create a new user.\nIf you have email confirmation enabled in your Photos API config - you need to use a real email that will get a confirmation code afterwards.",
|
||||
flush=True,
|
||||
)
|
||||
username = input("Choose username for new Photos API user: ").strip()
|
||||
email = input(f"Choose email for user '{username}': ").strip()
|
||||
password = input(f"Choose password for user '{username}': ").strip()
|
||||
try:
|
||||
result_1 = await create_user(username, email, password)
|
||||
# asyncio.run(create_user(username, email, password))
|
||||
configSet("username", username, "posting", "api")
|
||||
configSet("password", password, "posting", "api")
|
||||
none = input(
|
||||
"Alright. If you have email confirmation enabled - please confirm registration by using the link in your email. After that press Enter. Otherwise just press Enter."
|
||||
)
|
||||
except Exception as exp:
|
||||
print(f"Could not create a user due to {exp}", flush=True)
|
||||
print_exc()
|
||||
exit()
|
||||
if not args.create_album:
|
||||
print("You're done!", flush=True)
|
||||
await http_session.close()
|
||||
exit()
|
||||
return None
|
||||
|
||||
|
||||
async def cli_create_album() -> None:
|
||||
print(
|
||||
"To use Photos API your user needs to have an album to store its data.\nThis wizard will help you to create a new album with its name and title.",
|
||||
flush=True,
|
||||
)
|
||||
name = input("Choose a name for your album: ").strip()
|
||||
title = input(f"Choose a title for album '{name}': ").strip()
|
||||
try:
|
||||
result_2 = await create_album(name, title)
|
||||
# asyncio.run(create_album(name, title))
|
||||
configSet("album", name, "posting", "api")
|
||||
except Exception as exp:
|
||||
print(f"Could not create an album due to {exp}", flush=True)
|
||||
print_exc()
|
||||
exit()
|
||||
print("You're done!", flush=True)
|
||||
await http_session.close()
|
||||
exit()
|
||||
return None
|
||||
|
||||
|
||||
if args.create_user or args.create_album:
|
||||
loop = asyncio.get_event_loop()
|
||||
tasks = []
|
||||
|
||||
if args.create_user:
|
||||
loop.run_until_complete(asyncio.wait([loop.create_task(cli_create_user())]))
|
||||
|
||||
if args.create_album:
|
||||
loop.run_until_complete(asyncio.wait([loop.create_task(cli_create_album())]))
|
||||
|
||||
loop.close()
|
@@ -1,23 +1,37 @@
|
||||
from os import listdir
|
||||
from pyrogram.client import Client
|
||||
from classes.poster_client import PosterClient
|
||||
from pyrogram.types import BotCommand, BotCommandScopeChat
|
||||
from modules.utils import configGet, locale
|
||||
|
||||
async def register_commands(app: Client):
|
||||
|
||||
async def register_commands(app: PosterClient) -> None:
|
||||
if configGet("submit", "mode"):
|
||||
# Registering user commands
|
||||
for entry in listdir(configGet("locale", "locations")):
|
||||
if entry.endswith(".json"):
|
||||
commands_list = []
|
||||
for command in configGet("commands"):
|
||||
commands_list.append(BotCommand(command, locale(command, "commands", locale=entry.replace(".json", ""))))
|
||||
await app.set_bot_commands(commands_list, language_code=entry.replace(".json", ""))
|
||||
commands_list.append(
|
||||
BotCommand(
|
||||
command,
|
||||
locale(
|
||||
command, "commands", locale=entry.replace(".json", "")
|
||||
),
|
||||
)
|
||||
)
|
||||
await app.set_bot_commands(
|
||||
commands_list, language_code=entry.replace(".json", "")
|
||||
)
|
||||
|
||||
# Registering user commands for fallback locale
|
||||
commands_list = []
|
||||
for command in configGet("commands"):
|
||||
commands_list.append(BotCommand(command, locale(command, "commands", locale=configGet("locale_fallback"))))
|
||||
commands_list.append(
|
||||
BotCommand(
|
||||
command,
|
||||
locale(command, "commands", locale=configGet("locale_fallback")),
|
||||
)
|
||||
)
|
||||
await app.set_bot_commands(commands_list)
|
||||
|
||||
# Registering admin commands
|
||||
@@ -25,9 +39,19 @@ async def register_commands(app: Client):
|
||||
|
||||
if configGet("submit", "mode"):
|
||||
for command in configGet("commands"):
|
||||
|
||||
commands_admin_list.append(BotCommand(command, locale(command, "commands", locale=configGet("locale"))))
|
||||
commands_admin_list.append(
|
||||
BotCommand(
|
||||
command, locale(command, "commands", locale=configGet("locale"))
|
||||
)
|
||||
)
|
||||
for command in configGet("commands_admin"):
|
||||
commands_admin_list.append(BotCommand(command, locale(command, "commands_admin", locale=configGet("locale"))))
|
||||
commands_admin_list.append(
|
||||
BotCommand(
|
||||
command, locale(command, "commands_admin", locale=configGet("locale"))
|
||||
)
|
||||
)
|
||||
|
||||
await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("admin")))
|
||||
for admin in app.admins:
|
||||
await app.set_bot_commands(
|
||||
commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)
|
||||
)
|
||||
|
@@ -8,18 +8,16 @@ with open("config.json", "r", encoding="utf-8") as f:
|
||||
f.close()
|
||||
|
||||
if db_config["user"] is not None and db_config["password"] is not None:
|
||||
con_string = 'mongodb://{0}:{1}@{2}:{3}/{4}'.format(
|
||||
con_string = "mongodb://{0}:{1}@{2}:{3}/{4}".format(
|
||||
db_config["user"],
|
||||
db_config["password"],
|
||||
db_config["host"],
|
||||
db_config["port"],
|
||||
db_config["name"]
|
||||
db_config["name"],
|
||||
)
|
||||
else:
|
||||
con_string = 'mongodb://{0}:{1}/{2}'.format(
|
||||
db_config["host"],
|
||||
db_config["port"],
|
||||
db_config["name"]
|
||||
con_string = "mongodb://{0}:{1}/{2}".format(
|
||||
db_config["host"], db_config["port"], db_config["name"]
|
||||
)
|
||||
|
||||
db_client = MongoClient(con_string)
|
||||
@@ -34,4 +32,4 @@ for collection in ["sent", "users", "banned", "submitted"]:
|
||||
col_sent = db.get_collection("sent")
|
||||
col_users = db.get_collection("users")
|
||||
col_banned = db.get_collection("banned")
|
||||
col_submitted = db.get_collection("submitted")
|
||||
col_submitted = db.get_collection("submitted")
|
||||
|
6
modules/http_client.py
Normal file
6
modules/http_client.py
Normal file
@@ -0,0 +1,6 @@
|
||||
from aiohttp import ClientSession
|
||||
from ujson import dumps
|
||||
|
||||
http_session = ClientSession(
|
||||
json_serialize=dumps,
|
||||
)
|
@@ -1,23 +1,19 @@
|
||||
try:
|
||||
from ujson import loads
|
||||
except ModuleNotFoundError:
|
||||
from json import loads
|
||||
|
||||
from os import stat, makedirs, path, getcwd
|
||||
from datetime import datetime
|
||||
from gzip import open as gzipopen
|
||||
from os import getcwd, makedirs, path, stat
|
||||
from shutil import copyfileobj
|
||||
|
||||
from datetime import datetime
|
||||
from ujson import loads
|
||||
|
||||
with open(getcwd()+path.sep+"config.json", "r", encoding='utf8') as file:
|
||||
with open(getcwd() + path.sep + "config.json", "r", encoding="utf8") as file:
|
||||
json_contents = loads(file.read())
|
||||
log_size = json_contents["logging"]["size"]
|
||||
log_folder = json_contents["logging"]["location"]
|
||||
file.close()
|
||||
|
||||
# Check latest log size
|
||||
def checkSize(debug=False):
|
||||
|
||||
# Check latest log size
|
||||
def checkSize(debug=False) -> None:
|
||||
global log_folder
|
||||
|
||||
if debug:
|
||||
@@ -29,18 +25,26 @@ def checkSize(debug=False):
|
||||
makedirs(log_folder, exist_ok=True)
|
||||
log = stat(path.join(log_folder, log_file))
|
||||
if (log.st_size / 1024) > log_size:
|
||||
with open(path.join(log_folder, log_file), 'rb') as f_in:
|
||||
with gzipopen(path.join(log_folder, f'{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz'), 'wb') as f_out:
|
||||
with open(path.join(log_folder, log_file), "rb") as f_in:
|
||||
with gzipopen(
|
||||
path.join(
|
||||
log_folder,
|
||||
f'{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz',
|
||||
),
|
||||
"wb",
|
||||
) as f_out:
|
||||
copyfileobj(f_in, f_out)
|
||||
print(f'Copied {path.join(log_folder, datetime.now().strftime("%d.%m.%Y_%H:%M:%S"))}.log.gz')
|
||||
open(path.join(log_folder, log_file), 'w').close()
|
||||
print(
|
||||
f'Copied {path.join(log_folder, datetime.now().strftime("%d.%m.%Y_%H:%M:%S"))}.log.gz'
|
||||
)
|
||||
open(path.join(log_folder, log_file), "w").close()
|
||||
except FileNotFoundError:
|
||||
print(f'Log file {path.join(log_folder, log_file)} does not exist')
|
||||
print(f"Log file {path.join(log_folder, log_file)} does not exist")
|
||||
pass
|
||||
|
||||
# Append string to log
|
||||
def logAppend(message, debug=False):
|
||||
|
||||
# Append string to log
|
||||
def logAppend(message, debug=False) -> None:
|
||||
global log_folder
|
||||
|
||||
message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] {message}'
|
||||
@@ -51,12 +55,13 @@ def logAppend(message, debug=False):
|
||||
else:
|
||||
log_file = "latest.log"
|
||||
|
||||
log = open(path.join(log_folder, log_file), 'a')
|
||||
log.write(f'{message_formatted}\n')
|
||||
log = open(path.join(log_folder, log_file), "a")
|
||||
log.write(f"{message_formatted}\n")
|
||||
log.close()
|
||||
|
||||
|
||||
# Print to stdout and then to log
|
||||
def logWrite(message, debug=False):
|
||||
def logWrite(message, debug=False) -> None:
|
||||
# save to log file and rotation is to be done
|
||||
logAppend(f'{message}', debug=debug)
|
||||
print(f"{message}", flush=True)
|
||||
logAppend(f"{message}", debug=debug)
|
||||
print(f"{message}", flush=True)
|
||||
|
@@ -1,5 +1,6 @@
|
||||
from datetime import datetime, timedelta
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from pytimeparse.timeparse import timeparse
|
||||
from modules.utils import configGet
|
||||
from modules.sender import send_content
|
||||
from modules.commands_register import register_commands
|
||||
@@ -8,10 +9,23 @@ from modules.app import app
|
||||
scheduler = AsyncIOScheduler()
|
||||
|
||||
if configGet("post", "mode"):
|
||||
# for entry in configGet("time", "posting"):
|
||||
# dt_obj = datetime.strptime(entry, "%H:%M")
|
||||
# Is only used for debug now!
|
||||
scheduler.add_job(send_content, "interval", seconds=30, args=[app])
|
||||
# scheduler.add_job(send_content, "cron", hour=dt_obj.hour, minute=dt_obj.minute, args=[app])
|
||||
if configGet("use_interval", "posting"):
|
||||
scheduler.add_job(
|
||||
send_content,
|
||||
"interval",
|
||||
seconds=timeparse(configGet("interval", "posting")),
|
||||
args=[app],
|
||||
)
|
||||
else:
|
||||
for entry in configGet("time", "posting"):
|
||||
dt_obj = datetime.strptime(entry, "%H:%M")
|
||||
scheduler.add_job(
|
||||
send_content, "cron", hour=dt_obj.hour, minute=dt_obj.minute, args=[app]
|
||||
)
|
||||
|
||||
scheduler.add_job(register_commands, "date", run_date=datetime.now()+timedelta(seconds=10), args=[app])
|
||||
scheduler.add_job(
|
||||
register_commands,
|
||||
"date",
|
||||
run_date=datetime.now() + timedelta(seconds=10),
|
||||
args=[app],
|
||||
)
|
||||
|
@@ -1,27 +1,29 @@
|
||||
from datetime import datetime, timezone
|
||||
from datetime import datetime
|
||||
from os import makedirs, path
|
||||
from shutil import copyfileobj, rmtree
|
||||
from random import choice
|
||||
from shutil import rmtree
|
||||
from traceback import format_exc
|
||||
from uuid import uuid4
|
||||
from PIL import Image
|
||||
import aiofiles
|
||||
|
||||
from bson import ObjectId
|
||||
from pyrogram.client import Client
|
||||
from requests import get
|
||||
from classes.poster_client import PosterClient
|
||||
|
||||
from modules.api_client import authorize, move_pic, random_pic
|
||||
from modules.api_client import authorize, move_pic, random_pic, http_session
|
||||
from modules.database import col_sent, col_submitted
|
||||
from modules.logger import logWrite
|
||||
from modules.utils import configGet, locale
|
||||
|
||||
|
||||
async def send_content(app: Client):
|
||||
|
||||
async def send_content(app: PosterClient) -> None:
|
||||
try:
|
||||
|
||||
try:
|
||||
token = await authorize()
|
||||
except ValueError:
|
||||
await app.send_message(configGet("admin"), locale("api_creds_invalid", "message", locale=configGet("locale")))
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
locale("api_creds_invalid", "message", locale=configGet("locale")),
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
@@ -29,19 +31,37 @@ async def send_content(app: Client):
|
||||
except KeyError:
|
||||
logWrite(locale("post_empty", "console", locale=configGet("locale")))
|
||||
if configGet("error", "reports"):
|
||||
await app.send_message(configGet("admin"), locale("api_queue_empty", "message", locale=configGet("locale")))
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
locale("api_queue_empty", "message", locale=configGet("locale")),
|
||||
)
|
||||
return
|
||||
except ValueError:
|
||||
if configGet("error", "reports"):
|
||||
await app.send_message(configGet("admin"), locale("api_queue_error", "message", locale=configGet("locale")))
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
locale("api_queue_error", "message", locale=configGet("locale")),
|
||||
)
|
||||
return
|
||||
|
||||
response = get(f'{configGet("address", "posting", "api")}/photos/{pic[0]}', headers={"Authorization": f"Bearer {token}"}, stream=True)
|
||||
|
||||
if response.status_code != 200:
|
||||
logWrite(locale("post_invalid_pic", "console", locale=configGet("locale")).format(str(response.json())))
|
||||
response = await http_session.get(
|
||||
f'{configGet("address", "posting", "api")}/photos/{pic[0]}',
|
||||
headers={"Authorization": f"Bearer {token}"},
|
||||
)
|
||||
|
||||
if response.status != 200:
|
||||
logWrite(
|
||||
locale(
|
||||
"post_invalid_pic", "console", locale=configGet("locale")
|
||||
).format(response.status, str(response.json()))
|
||||
)
|
||||
if configGet("error", "reports"):
|
||||
await app.send_message(configGet("admin"), locale("post_invalid_pic", "message", locale=configGet("locale")).format(response.json()))
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
locale(
|
||||
"post_invalid_pic", "message", locale=configGet("locale")
|
||||
).format(response.status, response.json()),
|
||||
)
|
||||
|
||||
tmp_dir = str(uuid4())
|
||||
|
||||
@@ -49,42 +69,101 @@ async def send_content(app: Client):
|
||||
|
||||
tmp_path = path.join(tmp_dir, pic[1])
|
||||
|
||||
with open(path.join(configGet("tmp", "locations"), tmp_path), 'wb') as out_file:
|
||||
copyfileobj(response.raw, out_file)
|
||||
async with aiofiles.open(
|
||||
path.join(configGet("tmp", "locations"), tmp_path), "wb"
|
||||
) as out_file:
|
||||
await out_file.write(await response.read())
|
||||
|
||||
logWrite(
|
||||
f'Candidate {pic[1]} ({pic[0]}) is {path.getsize(path.join(configGet("tmp", "locations"), tmp_path))} bytes big',
|
||||
debug=True,
|
||||
)
|
||||
|
||||
if path.getsize(path.join(configGet("tmp", "locations"), tmp_path)) > 5242880:
|
||||
image = Image.open(path.join(configGet("tmp", "locations"), tmp_path))
|
||||
width, height = image.size
|
||||
image = image.resize((int(width / 2), int(height / 2)), Image.ANTIALIAS)
|
||||
if tmp_path.lower().endswith(".jpeg") or tmp_path.lower().endswith(".jpg"):
|
||||
image.save(
|
||||
path.join(configGet("tmp", "locations"), tmp_path),
|
||||
"JPEG",
|
||||
optimize=True,
|
||||
quality=50,
|
||||
)
|
||||
elif tmp_path.lower().endswith(".png"):
|
||||
image.save(
|
||||
path.join(configGet("tmp", "locations"), tmp_path),
|
||||
"PNG",
|
||||
optimize=True,
|
||||
compress_level=8,
|
||||
)
|
||||
image.close()
|
||||
|
||||
if path.getsize(path.join(configGet("tmp", "locations"), tmp_path)) > 5242880:
|
||||
rmtree(
|
||||
path.join(configGet("tmp", "locations"), tmp_dir), ignore_errors=True
|
||||
)
|
||||
raise BytesWarning
|
||||
|
||||
del response
|
||||
|
||||
submitted_caption = col_submitted.find_one( {"image": ObjectId(pic[0])} )
|
||||
submitted = col_submitted.find_one({"temp.file": pic[1]})
|
||||
|
||||
if submitted_caption is not None:
|
||||
caption = submitted_caption["caption"].strip()
|
||||
if submitted is not None and submitted["caption"] is not None:
|
||||
caption = submitted["caption"].strip()
|
||||
else:
|
||||
caption = ""
|
||||
|
||||
if (
|
||||
submitted is not None
|
||||
and configGet("enabled", "posting", "submitted_caption")
|
||||
and (
|
||||
(submitted["user"] not in app.admins)
|
||||
or (configGet("ignore_admins", "posting", "submitted_caption") is False)
|
||||
)
|
||||
):
|
||||
caption = (
|
||||
f"{caption}\n\n{configGet('text', 'posting', 'submitted_caption')}\n"
|
||||
)
|
||||
else:
|
||||
caption = f"{caption}\n\n"
|
||||
|
||||
if configGet("enabled", "caption"):
|
||||
if configGet("link", "caption") != None:
|
||||
caption = f"{caption}\n\n[{configGet('text', 'caption')}]({configGet('link', 'caption')})"
|
||||
caption = f"{caption}[{choice(configGet('text', 'caption'))}]({configGet('link', 'caption')})"
|
||||
else:
|
||||
caption = f"{caption}\n\n{configGet('text', 'caption')}"
|
||||
caption = f"{caption}{choice(configGet('text', 'caption'))}"
|
||||
else:
|
||||
caption = caption
|
||||
|
||||
try:
|
||||
sent = await app.send_photo(configGet("channel", "posting"), path.join(configGet("tmp", "locations"), tmp_path), caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
sent = await app.send_photo(
|
||||
configGet("channel", "posting"),
|
||||
path.join(configGet("tmp", "locations"), tmp_path),
|
||||
caption=caption,
|
||||
disable_notification=configGet("silent", "posting"),
|
||||
)
|
||||
except Exception as exp:
|
||||
logWrite(f"Could not send image {pic[1]} ({pic[0]}) due to {exp}")
|
||||
if configGet("error", "reports"):
|
||||
await app.send_message(configGet("admin"), locale("post_exception", "message", locale=configGet("locale")).format(exp, format_exc()))
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
locale(
|
||||
"post_exception", "message", locale=configGet("locale")
|
||||
).format(exp, format_exc()),
|
||||
)
|
||||
# rmtree(path.join(configGet("tmp", "locations"), tmp_dir), ignore_errors=True)
|
||||
return
|
||||
|
||||
col_sent.insert_one(
|
||||
{
|
||||
"date": datetime.now(tz=timezone.utc),
|
||||
"date": datetime.now(),
|
||||
"image": pic[0],
|
||||
"filename": pic[1],
|
||||
"channel": configGet("channel", "posting"),
|
||||
"caption": None if submitted_caption is None else submitted_caption["caption"].strip()
|
||||
"caption": None
|
||||
if (submitted is None or submitted["caption"] is None)
|
||||
else submitted["caption"].strip(),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -92,143 +171,31 @@ async def send_content(app: Client):
|
||||
|
||||
rmtree(path.join(configGet("tmp", "locations"), tmp_dir), ignore_errors=True)
|
||||
|
||||
logWrite(locale("post_sent", "console", locale=configGet("locale")).format(pic[0], str(configGet("channel", "posting")), caption.replace("\n", "%n"), str(configGet("silent", "posting"))))
|
||||
logWrite(
|
||||
locale("post_sent", "console", locale=configGet("locale")).format(
|
||||
pic[0],
|
||||
str(configGet("channel", "posting")),
|
||||
caption.replace("\n", "%n"),
|
||||
str(configGet("silent", "posting")),
|
||||
)
|
||||
)
|
||||
|
||||
except Exception as exp:
|
||||
logWrite(locale("post_exception", "console", locale=configGet("locale")).format(str(exp), format_exc()))
|
||||
logWrite(
|
||||
locale("post_exception", "console", locale=configGet("locale")).format(
|
||||
str(exp), format_exc()
|
||||
)
|
||||
)
|
||||
if configGet("error", "reports"):
|
||||
await app.send_message(configGet("admin"), locale("post_exception", "message", locale=configGet("locale")).format(exp, format_exc()))
|
||||
|
||||
|
||||
# async def send_content_old(app: Client):
|
||||
|
||||
# # Send post to channel
|
||||
# try:
|
||||
|
||||
# index = jsonLoad(configGet("index", "locations"))
|
||||
|
||||
# if configGet("api_based", "mode"):
|
||||
|
||||
# try:
|
||||
# pic = random_pic()
|
||||
# except:
|
||||
# logWrite(locale("post_empty", "console", locale=configGet("locale")))
|
||||
# if configGet("error", "reports"):
|
||||
# await app.send_message(configGet("admin"), locale("post_empty", "message", locale=configGet("locale")))
|
||||
# return
|
||||
|
||||
# token = authorize()
|
||||
|
||||
# response = get(f'{configGet("address", "posting", "api")}/photos/{pic[0]}', headers={"Authorization": f"Bearer {token}"}, stream=True)
|
||||
|
||||
# with open(configGet("tmp", "locations")+sep+pic[0]+".jpg", 'wb') as out_file:
|
||||
# copyfileobj(response.raw, out_file)
|
||||
|
||||
# del response
|
||||
|
||||
# candidate = configGet("tmp", "locations")+sep+pic[0]+".jpg"
|
||||
# candidate_file = pic[1]
|
||||
# ext_type = "photo"
|
||||
|
||||
# if not configGet("api_based", "mode"):
|
||||
|
||||
# list_queue = listdir(configGet("queue", "locations"))
|
||||
|
||||
# for file in list_queue:
|
||||
|
||||
# if not file in index["sent"]:
|
||||
|
||||
# ext_match = False
|
||||
|
||||
# for ext in configGet("photo", "posting", "extensions"):
|
||||
# if file.endswith(ext):
|
||||
# ext_match = True
|
||||
# ext_type = "photo"
|
||||
# break
|
||||
|
||||
# for ext in configGet("video", "posting", "extensions"):
|
||||
# if file.endswith(ext):
|
||||
# ext_match = True
|
||||
# ext_type = "video"
|
||||
# break
|
||||
|
||||
# if not ext_match:
|
||||
# list_queue.remove(file)
|
||||
|
||||
# else:
|
||||
# list_queue.remove(file)
|
||||
|
||||
# if len(list_queue) > 0:
|
||||
# candidate_file = choice(list_queue)
|
||||
# candidate = configGet("queue", "locations")+sep+candidate_file
|
||||
# else:
|
||||
# logWrite(locale("post_empty", "console", locale=configGet("locale")))
|
||||
# if configGet("error", "reports"):
|
||||
# await app.send_message(configGet("admin"), locale("post_empty", "message", locale=configGet("locale")))
|
||||
# return
|
||||
|
||||
# if candidate_file in index["captions"]:
|
||||
# caption = index["captions"][candidate_file]
|
||||
# else:
|
||||
# caption = ""
|
||||
|
||||
# if configGet("enabled", "caption"):
|
||||
# if configGet("link", "caption") != None:
|
||||
# caption = f"{caption}\n\n[{configGet('text', 'caption')}]({configGet('link', 'caption')})"
|
||||
# else:
|
||||
# caption = f"{caption}\n\n{configGet('text', 'caption')}"
|
||||
# else:
|
||||
# caption = caption
|
||||
|
||||
# if ext_type == "photo":
|
||||
|
||||
# if configGet("enabled", "caption"):
|
||||
# if configGet("link", "caption") != None:
|
||||
# sent = await app.send_photo(configGet("channel", "posting"), candidate, caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
# else:
|
||||
# sent = await app.send_photo(configGet("channel", "posting"), candidate, caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
# else:
|
||||
# sent = await app.send_photo(configGet("channel", "posting"), candidate, caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
|
||||
# elif ext_type == "video":
|
||||
|
||||
# if configGet("enabled", "caption"):
|
||||
# if configGet("link", "caption") != None:
|
||||
# sent = await app.send_video(configGet("channel", "posting"), candidate, caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
# else:
|
||||
# sent = await app.send_video(configGet("channel", "posting"), candidate, caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
# else:
|
||||
# sent = await app.send_video(configGet("channel", "posting"), candidate, caption=caption, disable_notification=configGet("silent", "posting"))
|
||||
|
||||
# else:
|
||||
# return
|
||||
|
||||
# if configGet("api_based", "mode"):
|
||||
# remove(configGet("tmp", "locations")+sep+pic[0]+".jpg")
|
||||
# move_pic(pic[0])
|
||||
|
||||
# index["sent"].append(candidate_file)
|
||||
# index["last_id"] = sent.id
|
||||
|
||||
# jsonSave(index, configGet("index", "locations"))
|
||||
|
||||
# if configGet("move_sent", "posting"):
|
||||
# move(candidate, configGet("sent", "locations")+sep+candidate_file)
|
||||
|
||||
# logWrite(locale("post_sent", "console", locale=configGet("locale")).format(candidate, ext_type, str(configGet("channel", "posting")), caption.replace("\n", "%n"), str(configGet("silent", "posting"))))
|
||||
|
||||
# if configGet("sent", "reports"):
|
||||
# await app.send_message(configGet("admin"), f"Posted `{candidate_file}`", disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup([
|
||||
# [InlineKeyboardButton(locale("post_view", "button", locale=configGet("locale")), url=sent.link)]
|
||||
# ]))
|
||||
|
||||
# except Exception as exp:
|
||||
# logWrite(locale("post_exception", "console", locale=configGet("locale")).format(str(exp), format_exc()))
|
||||
# if configGet("error", "reports"):
|
||||
# await app.send_message(configGet("admin"), locale("post_exception", "message", locale=configGet("locale")).format(exp, format_exc()))
|
||||
# pass
|
||||
|
||||
|
||||
# Work in progress
|
||||
# Check last posts forwards
|
||||
# check_forwards(app)
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
locale("post_exception", "message", locale=configGet("locale")).format(
|
||||
exp, format_exc()
|
||||
),
|
||||
)
|
||||
try:
|
||||
rmtree(
|
||||
path.join(configGet("tmp", "locations"), tmp_dir), ignore_errors=True
|
||||
)
|
||||
except:
|
||||
pass
|
||||
|
@@ -1,27 +0,0 @@
|
||||
from datetime import datetime, timezone
|
||||
from modules.utils import configGet
|
||||
from modules.database import col_users, col_banned
|
||||
from pyrogram.types.user_and_chats import User
|
||||
|
||||
def subLimit(user: User) -> None:
|
||||
if col_users.find_one_and_update({"user": user.id}, {"$set": {"cooldown": datetime.now(tz=timezone.utc)}}) is None:
|
||||
col_users.insert_one({"user": user.id, "cooldown": datetime.now(tz=timezone.utc)})
|
||||
|
||||
def subLimited(user: User) -> bool:
|
||||
if user.id == configGet("admin"):
|
||||
return False
|
||||
else:
|
||||
db_record = col_users.find_one({"user": user.id})
|
||||
if db_record is None:
|
||||
return False
|
||||
return True if (datetime.now(tz=timezone.utc) - db_record["cooldown"]).total_seconds() < configGet("timeout", "submission") else False
|
||||
|
||||
def subBlocked(user: User) -> bool:
|
||||
return False if col_banned.find_one({"user": user.id}) is None else True
|
||||
|
||||
def subBlock(user: User) -> None:
|
||||
if col_banned.find_one({"user": user.id}) is None:
|
||||
col_banned.insert_one({"user": user.id, "date": datetime.now(tz=timezone.utc)})
|
||||
|
||||
def subUnblock(user: User) -> None:
|
||||
col_banned.find_one_and_delete({"user": user.id})
|
195
modules/utils.py
195
modules/utils.py
@@ -1,36 +1,125 @@
|
||||
try:
|
||||
from ujson import JSONDecodeError as JSONDecodeError
|
||||
from ujson import loads, dumps
|
||||
except ModuleNotFoundError:
|
||||
from json import JSONDecodeError as JSONDecodeError
|
||||
from json import loads, dumps
|
||||
|
||||
from sys import exit
|
||||
from os import sep, kill
|
||||
from os import kill, makedirs
|
||||
from os import name as osname
|
||||
from os import path, sep
|
||||
from sys import exit
|
||||
from traceback import print_exc
|
||||
from typing import Any
|
||||
from zipfile import ZipFile
|
||||
|
||||
import aiofiles
|
||||
from ujson import JSONDecodeError, dumps, loads
|
||||
|
||||
from modules.logger import logWrite
|
||||
|
||||
def jsonLoad(filename):
|
||||
default_config = {
|
||||
"locale": "en",
|
||||
"locale_log": "en",
|
||||
"locale_fallback": "en",
|
||||
"owner": 0,
|
||||
"admins": [],
|
||||
"bot": {"api_id": 0, "api_hash": "", "bot_token": ""},
|
||||
"database": {
|
||||
"user": None,
|
||||
"password": None,
|
||||
"host": "127.0.0.1",
|
||||
"port": 27017,
|
||||
"name": "tgposter",
|
||||
},
|
||||
"mode": {"post": True, "submit": True},
|
||||
"reports": {"sent": False, "error": True, "startup": True, "shutdown": True},
|
||||
"logging": {"size": 512, "location": "logs"},
|
||||
"locations": {
|
||||
"tmp": "tmp",
|
||||
"data": "data",
|
||||
"cache": "cache",
|
||||
"sent": "data/sent",
|
||||
"queue": "data/queue",
|
||||
"index": "data/index.json",
|
||||
"locale": "locale",
|
||||
},
|
||||
"posting": {
|
||||
"channel": 0,
|
||||
"silent": False,
|
||||
"move_sent": False,
|
||||
"use_interval": False,
|
||||
"interval": "1h30m",
|
||||
"page_size": 300,
|
||||
"submitted_caption": {
|
||||
"enabled": True,
|
||||
"ignore_admins": True,
|
||||
"text": "#submitted",
|
||||
},
|
||||
"extensions": {
|
||||
"photo": ["jpg", "png", "gif", "jpeg"],
|
||||
"video": ["mp4", "avi", "mkv", "webm", "mov"],
|
||||
},
|
||||
"time": [
|
||||
"08:00",
|
||||
"10:00",
|
||||
"12:00",
|
||||
"14:00",
|
||||
"16:00",
|
||||
"18:00",
|
||||
"20:00",
|
||||
"22:00",
|
||||
],
|
||||
"api": {
|
||||
"address": "http://localhost:8054",
|
||||
"address_external": "https://photos.domain.com",
|
||||
"username": "",
|
||||
"password": "",
|
||||
"album": "",
|
||||
},
|
||||
},
|
||||
"caption": {"enabled": False, "link": None, "text": ["sample text"]},
|
||||
"submission": {
|
||||
"timeout": 30,
|
||||
"file_size": 15728640,
|
||||
"tmp_size": 15728640,
|
||||
"allow_duplicates": False,
|
||||
"send_uploaded_id": False,
|
||||
"require_confirmation": {"users": True, "admins": True},
|
||||
"mime_types": [
|
||||
"image/png",
|
||||
"image/gif",
|
||||
"image/jpeg",
|
||||
"video/mp4",
|
||||
"video/quicktime",
|
||||
],
|
||||
},
|
||||
"commands": ["start", "rules"],
|
||||
"commands_admin": ["import", "export", "shutdown"],
|
||||
}
|
||||
|
||||
|
||||
def jsonLoad(filename: str) -> Any:
|
||||
"""Loads arg1 as json and returns its contents"""
|
||||
with open(filename, "r", encoding='utf8') as file:
|
||||
with open(filename, "r", encoding="utf8") as file:
|
||||
try:
|
||||
output = loads(file.read())
|
||||
except JSONDecodeError:
|
||||
logWrite(f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}")
|
||||
logWrite(
|
||||
f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}"
|
||||
)
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
logWrite(f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}")
|
||||
logWrite(
|
||||
f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}"
|
||||
)
|
||||
raise
|
||||
file.close()
|
||||
return output
|
||||
|
||||
def jsonSave(contents, filename):
|
||||
|
||||
def jsonSave(contents: Any, filename: str) -> None:
|
||||
"""Dumps dict/list arg1 to file arg2"""
|
||||
try:
|
||||
with open(filename, "w", encoding='utf8') as file:
|
||||
file.write(dumps(contents, ensure_ascii=False, indent=4))
|
||||
with open(filename, "w", encoding="utf8") as file:
|
||||
file.write(
|
||||
dumps(
|
||||
contents, ensure_ascii=False, indent=4, escape_forward_slashes=False
|
||||
)
|
||||
)
|
||||
file.close()
|
||||
except Exception as exp:
|
||||
logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}")
|
||||
@@ -43,7 +132,7 @@ def configSet(key: str, value, *args: str):
|
||||
* key (str): The last key of the keys path.
|
||||
* value (str/int/float/list/dict/None): Some needed value.
|
||||
* *args (str): Path to key like: dict[args][key].
|
||||
"""
|
||||
"""
|
||||
this_dict = jsonLoad("config.json")
|
||||
string = "this_dict"
|
||||
for arg in args:
|
||||
@@ -56,6 +145,7 @@ def configSet(key: str, value, *args: str):
|
||||
jsonSave(this_dict, "config.json")
|
||||
return
|
||||
|
||||
|
||||
def configGet(key: str, *args: str):
|
||||
"""Get value of the config key
|
||||
Args:
|
||||
@@ -63,13 +153,25 @@ def configGet(key: str, *args: str):
|
||||
* *args (str): Path to key like: dict[args][key].
|
||||
Returns:
|
||||
* any: Value of provided key
|
||||
"""
|
||||
"""
|
||||
this_dict = jsonLoad("config.json")
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
try:
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
this_key[key]
|
||||
except KeyError:
|
||||
print(
|
||||
f"Could not find config key '{key}' under path {args}: falling back to default config",
|
||||
flush=True,
|
||||
)
|
||||
this_key = default_config
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
configSet(key, this_key[key], *args)
|
||||
return this_key[key]
|
||||
|
||||
|
||||
def locale(key: str, *args: str, locale=configGet("locale")):
|
||||
"""Get value of locale string
|
||||
Args:
|
||||
@@ -78,40 +180,73 @@ def locale(key: str, *args: str, locale=configGet("locale")):
|
||||
* locale (str): Locale to looked up in. Defaults to config's locale value.
|
||||
Returns:
|
||||
* any: Value of provided locale key
|
||||
"""
|
||||
if (locale == None):
|
||||
"""
|
||||
if locale == None:
|
||||
locale = configGet("locale")
|
||||
|
||||
|
||||
try:
|
||||
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json')
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json')
|
||||
this_dict = jsonLoad(
|
||||
f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json'
|
||||
)
|
||||
except FileNotFoundError:
|
||||
try:
|
||||
this_dict = jsonLoad(f'{configGet("locale_fallback", "locations")}{sep}{configGet("locale")}.json')
|
||||
this_dict = jsonLoad(
|
||||
f'{configGet("locale_fallback", "locations")}{sep}{configGet("locale")}.json'
|
||||
)
|
||||
except:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
|
||||
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
|
||||
|
||||
try:
|
||||
return this_key[key]
|
||||
except KeyError:
|
||||
return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
|
||||
|
||||
|
||||
async def extract_and_save(handle: ZipFile, filename: str, destpath: str):
|
||||
"""Extract and save file from archive
|
||||
|
||||
Args:
|
||||
* handle (ZipFile): ZipFile handler
|
||||
* filename (str): File base name
|
||||
* path (str): Path where to store
|
||||
"""
|
||||
data = handle.read(filename)
|
||||
filepath = path.join(destpath, filename)
|
||||
try:
|
||||
makedirs(path.dirname(filepath), exist_ok=True)
|
||||
async with aiofiles.open(filepath, "wb") as fd:
|
||||
await fd.write(data)
|
||||
logWrite(f"Unzipped {filename}", debug=True)
|
||||
except IsADirectoryError:
|
||||
makedirs(filepath, exist_ok=True)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
return
|
||||
|
||||
|
||||
try:
|
||||
from psutil import Process
|
||||
except ModuleNotFoundError:
|
||||
print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
|
||||
exit()
|
||||
|
||||
def killProc(pid):
|
||||
|
||||
def killProc(pid: int) -> None:
|
||||
"""Kill process by its PID. Meant to be used to kill the main process of bot itself.
|
||||
|
||||
### Args:
|
||||
* pid (`int`): PID of the target
|
||||
"""
|
||||
if osname == "posix":
|
||||
from signal import SIGKILL
|
||||
|
||||
kill(pid, SIGKILL)
|
||||
else:
|
||||
p = Process(pid)
|
||||
p.kill()
|
||||
Process(pid).kill()
|
||||
|
Reference in New Issue
Block a user