Formatted everything with black

This commit is contained in:
Profitroll 2023-03-18 00:52:15 +01:00
parent dcf82ab6f2
commit c3dd6f61d6
4 changed files with 231 additions and 131 deletions

196
bwtbot.py
View File

@ -1,4 +1,4 @@
#-*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from os import getpid, system from os import getpid, system
from subprocess import call from subprocess import call
@ -15,19 +15,32 @@ config = jsonLoad("config.json")
owner_id = config["owner_id"] owner_id = config["owner_id"]
app = Client(config["bot_name"], api_id=config["api_id"], api_hash=config["api_hash"], bot_token=config["bot_token"]) app = Client(
config["bot_name"],
api_id=config["api_id"],
api_hash=config["api_hash"],
bot_token=config["bot_token"],
)
@app.on_message(~ filters.scheduled & filters.command(["setcard", "задать карту"], prefixes=["/", ""])) @app.on_message(
~filters.scheduled
& filters.command(["setcard", "задать карту"], prefixes=["/", ""])
)
async def setcard(_: Client, msg: Message): async def setcard(_: Client, msg: Message):
if userGet(msg.from_user.id, "context") is None: if userGet(msg.from_user.id, "context") is None:
userSet(msg.from_user.id, "context", "set") userSet(msg.from_user.id, "context", "set")
await msg.reply_text(string("send_number"), reply_markup=ForceReply(placeholder=string("enter_number"))) await msg.reply_text(
string("send_number"),
reply_markup=ForceReply(placeholder=string("enter_number")),
)
else: else:
await msg.reply_text(string("cancel_first")) await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["cancel", "відміна"], prefixes=["/", ""])) @app.on_message(
~filters.scheduled & filters.command(["cancel", "відміна"], prefixes=["/", ""])
)
async def cancel(_: Client, msg: Message): async def cancel(_: Client, msg: Message):
if userGet(msg.from_user.id, "context") is not None: if userGet(msg.from_user.id, "context") is not None:
userReset(msg.from_user.id, "context") userReset(msg.from_user.id, "context")
@ -36,7 +49,10 @@ async def cancel(_: Client, msg: Message):
await msg.reply_text(string("cancel_none")) await msg.reply_text(string("cancel_none"))
@app.on_message(~ filters.scheduled & filters.command(["resetcard", "забути картку"], prefixes=["/", ""])) @app.on_message(
~filters.scheduled
& filters.command(["resetcard", "забути картку"], prefixes=["/", ""])
)
async def resetcard(_: Client, msg: Message): async def resetcard(_: Client, msg: Message):
if userGet(msg.from_user.id, "context") is None: if userGet(msg.from_user.id, "context") is None:
if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]: if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]:
@ -50,52 +66,91 @@ async def resetcard(_: Client, msg: Message):
await msg.reply_text(string("cancel_first")) await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["balance", "баланс"], prefixes=["/", ""])) @app.on_message(
~filters.scheduled & filters.command(["balance", "баланс"], prefixes=["/", ""])
)
async def balance(_: Client, msg: Message): async def balance(_: Client, msg: Message):
if userGet(msg.from_user.id, "context") is None: if userGet(msg.from_user.id, "context") is None:
try: try:
if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]: if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]:
await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING) await app.send_chat_action(
water_left = await getWaterLeft(userGet(msg.from_user.id, "card"), msg.from_user.id, app) chat_id=msg.chat.id, action=ChatAction.TYPING
)
water_left = await getWaterLeft(
userGet(msg.from_user.id, "card"), msg.from_user.id, app
)
if water_left == "": if water_left == "":
await msg.reply_text(string("error_new").format(f'https://bwtaqua.com.ua/card-topup/?id={userGet(msg.from_user.id, "card")}')) await msg.reply_text(
string("error_new").format(
f'https://bwtaqua.com.ua/card-topup/?id={userGet(msg.from_user.id, "card")}'
)
)
# raise EmptyCardException("Card information is empty") # raise EmptyCardException("Card information is empty")
elif water_left == "Failure": elif water_left == "Failure":
await msg.reply_text(string("error_occured").format(string("get_number"))) await msg.reply_text(
appendLog(f"User {str(msg.from_user.id)} could not get left water amount") string("error_occured").format(string("get_number"))
)
appendLog(
f"User {str(msg.from_user.id)} could not get left water amount"
)
else: else:
await msg.reply_text(string("card_balance").format(water_left)) await msg.reply_text(string("card_balance").format(water_left))
appendLog(f"User {str(msg.from_user.id)} has {water_left} liters remaining") appendLog(
f"User {str(msg.from_user.id)} has {water_left} liters remaining"
)
else: else:
await msg.reply_text(string("card_not_linked").format(string("get_number"))) await msg.reply_text(
appendLog(f"User {str(msg.from_user.id)} tried to get balance without card set") string("card_not_linked").format(string("get_number"))
)
appendLog(
f"User {str(msg.from_user.id)} tried to get balance without card set"
)
except Exception as exp: except Exception as exp:
if msg.from_user.id != config["owner_id"]: if msg.from_user.id != config["owner_id"]:
await msg.reply_text(string("error_occured").format(string("get_number"))) await msg.reply_text(
await app.send_message(owner_id, f"Error occured by {str(msg.from_user.id)}:\nException: `{exp}`\nTraceback: `{format_exc()}`") string("error_occured").format(string("get_number"))
)
await app.send_message(
owner_id,
f"Error occured by {str(msg.from_user.id)}:\nException: `{exp}`\nTraceback: `{format_exc()}`",
)
appendLog(f"User {str(msg.from_user.id)} could not get left water amount") appendLog(f"User {str(msg.from_user.id)} could not get left water amount")
else: else:
await msg.reply_text(string("cancel_first")) await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["topup", "refill", "поповнити"], prefixes=["/", ""])) @app.on_message(
~filters.scheduled
& filters.command(["topup", "refill", "поповнити"], prefixes=["/", ""])
)
async def topup_cmd(_: Client, msg: Message): async def topup_cmd(_: Client, msg: Message):
if userGet(msg.from_user.id, "context") is None: if userGet(msg.from_user.id, "context") is None:
try: try:
if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]: if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]:
await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING) await app.send_chat_action(
await msg.reply_text(string("top_up").format(str(userGet(msg.from_user.id, "card")))) chat_id=msg.chat.id, action=ChatAction.TYPING
)
await msg.reply_text(
string("top_up").format(str(userGet(msg.from_user.id, "card")))
)
appendLog(f"User {str(msg.from_user.id)} requested top up") appendLog(f"User {str(msg.from_user.id)} requested top up")
else: else:
await msg.reply_text(string("card_not_linked").format(string("get_number"))) await msg.reply_text(
appendLog(f"User {str(msg.from_user.id)} tried to request top up without card set") string("card_not_linked").format(string("get_number"))
)
appendLog(
f"User {str(msg.from_user.id)} tried to request top up without card set"
)
except Exception as exp: except Exception as exp:
await msg.reply_text(str(exp)) await msg.reply_text(str(exp))
else: else:
await msg.reply_text(string("cancel_first")) await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["start", "help", "допомога"], prefixes=["/", ""])) @app.on_message(
~filters.scheduled
& filters.command(["start", "help", "допомога"], prefixes=["/", ""])
)
async def help(_: Client, msg: Message): async def help(_: Client, msg: Message):
if userGet(msg.from_user.id, "context") is None: if userGet(msg.from_user.id, "context") is None:
await msg.reply_text(string("welcome").format(string("get_number"))) await msg.reply_text(string("welcome").format(string("get_number")))
@ -106,16 +161,20 @@ async def help(_: Client, msg: Message):
else: else:
await msg.reply_text(string("cancel_first")) await msg.reply_text(string("cancel_first"))
pid = getpid() pid = getpid()
@app.on_message(~ filters.scheduled & filters.command(["kill", "die", "shutdown"], prefixes="/"))
@app.on_message(
~filters.scheduled & filters.command(["kill", "die", "shutdown"], prefixes="/")
)
async def kill(_: Client, msg: Message): async def kill(_: Client, msg: Message):
if msg.from_user.id == owner_id: if msg.from_user.id == owner_id:
await msg.reply_text(f"Shutting down bot with pid **{pid}**") await msg.reply_text(f"Shutting down bot with pid **{pid}**")
system(f"kill -9 {pid}") system(f"kill -9 {pid}")
@app.on_message(~ filters.scheduled) @app.on_message(~filters.scheduled)
async def any_message_handler(app, msg): async def any_message_handler(app, msg):
if userGet(msg.from_user.id, "context") == "set": if userGet(msg.from_user.id, "context") == "set":
userSet(msg.from_user.id, "card", msg.text) userSet(msg.from_user.id, "card", msg.text)
@ -123,54 +182,63 @@ async def any_message_handler(app, msg):
appendLog(f"User {str(msg.from_user.id)} set card id to {msg.text}") appendLog(f"User {str(msg.from_user.id)} set card id to {msg.text}")
await msg.reply_text(string("card_linked").format(msg.text)) await msg.reply_text(string("card_linked").format(msg.text))
print(f'{nowtime()} {WHITE}Starting with PID {YELLOW}{pid}{RESET}')
app.start() # type: ignore print(f"{nowtime()} {WHITE}Starting with PID {YELLOW}{pid}{RESET}")
app.send_message(owner_id, f"Starting bot with pid **{pid}**") # type: ignore
app.set_bot_commands([ app.start() # type: ignore
BotCommand("help", "Меню допомоги"), app.send_message(owner_id, f"Starting bot with pid **{pid}**") # type: ignore
BotCommand("balance", "Баланс картки"),
BotCommand("topup", "Поповнити картку"), app.set_bot_commands(
BotCommand("setcard", "Прив'язати картку"), [
BotCommand("resetcard", "Відв'язати картку"), BotCommand("help", "Меню допомоги"),
BotCommand("cancel", "Відмінити операцію"), BotCommand("balance", "Баланс картки"),
BotCommand("topup", "Поповнити картку"),
BotCommand("setcard", "Прив'язати картку"),
BotCommand("resetcard", "Відв'язати картку"),
BotCommand("cancel", "Відмінити операцію"),
], ],
language_code="uk") # type: ignore language_code="uk",
) # type: ignore
app.set_bot_commands([ app.set_bot_commands(
BotCommand("help", "Меню допомоги"), [
BotCommand("balance", "Баланс картки"), BotCommand("help", "Меню допомоги"),
BotCommand("topup", "Поповнити картку"), BotCommand("balance", "Баланс картки"),
BotCommand("setcard", "Прив'язати картку"), BotCommand("topup", "Поповнити картку"),
BotCommand("resetcard", "Відв'язати картку"), BotCommand("setcard", "Прив'язати картку"),
BotCommand("cancel", "Відмінити операцію"), BotCommand("resetcard", "Відв'язати картку"),
BotCommand("cancel", "Відмінити операцію"),
], ],
language_code="ru") # type: ignore language_code="ru",
) # type: ignore
app.set_bot_commands([ app.set_bot_commands(
BotCommand("help", "Help menu"), [
BotCommand("balance", "Card's balance"), BotCommand("help", "Help menu"),
BotCommand("topup", "Refill card"), BotCommand("balance", "Card's balance"),
BotCommand("setcard", "Link card"), BotCommand("topup", "Refill card"),
BotCommand("resetcard", "Unlink card"), BotCommand("setcard", "Link card"),
BotCommand("cancel", "Cancel operation"), BotCommand("resetcard", "Unlink card"),
]) # type: ignore BotCommand("cancel", "Cancel operation"),
]
) # type: ignore
app.set_bot_commands([ app.set_bot_commands(
BotCommand("help", "Help menu"), [
BotCommand("balance", "Card's balance"), BotCommand("help", "Help menu"),
BotCommand("topup", "Refill card"), BotCommand("balance", "Card's balance"),
BotCommand("setcard", "Link card"), BotCommand("topup", "Refill card"),
BotCommand("resetcard", "Unlink card"), BotCommand("setcard", "Link card"),
BotCommand("shutdown", "Turn off the bot"), BotCommand("resetcard", "Unlink card"),
BotCommand("cancel", "Cancel operation"), BotCommand("shutdown", "Turn off the bot"),
BotCommand("cancel", "Cancel operation"),
], ],
scope=BotCommandScopeChat(chat_id=owner_id)) # type: ignore scope=BotCommandScopeChat(chat_id=owner_id),
) # type: ignore
idle() idle()
app.send_message(owner_id, f"Shutting down bot with pid **{pid}**") # type: ignore app.send_message(owner_id, f"Shutting down bot with pid **{pid}**") # type: ignore
print(f'\n{nowtime()} {WHITE}Shutting down with PID {YELLOW}{pid}{RESET}') print(f"\n{nowtime()} {WHITE}Shutting down with PID {YELLOW}{pid}{RESET}")
call(f'kill -9 {pid}', shell=True) call(f"kill -9 {pid}", shell=True)

View File

@ -1,4 +1,4 @@
#-*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from os import makedirs, stat from os import makedirs, stat
from gzip import open as gzipopen from gzip import open as gzipopen
@ -13,11 +13,12 @@ logs_folder = "logs/"
def jsonSave(filename, value): def jsonSave(filename, value):
with open(filename, 'w', encoding="utf-8") as f: with open(filename, "w", encoding="utf-8") as f:
f.write(dumps(value, indent=4, ensure_ascii=False)) f.write(dumps(value, indent=4, ensure_ascii=False))
def jsonLoad(filename): def jsonLoad(filename):
with open(filename, 'r', encoding="utf-8") as f: with open(filename, "r", encoding="utf-8") as f:
value = loads(f.read()) value = loads(f.read())
return value return value
@ -31,60 +32,64 @@ owner_id = config["owner_id"]
def nowtime(): def nowtime():
return f'{BBLACK}[{CYAN}{datetime.now().strftime("%H:%M:%S")}{BBLACK}]{RESET}' return f'{BBLACK}[{CYAN}{datetime.now().strftime("%H:%M:%S")}{BBLACK}]{RESET}'
def checkSize(): def checkSize():
global logs_folder, log_size global logs_folder, log_size
i = 0 i = 0
while i < 2: while i < 2:
try: try:
log = stat(logs_folder + 'latest.log') log = stat(logs_folder + "latest.log")
if (log.st_size / 1024) > log_size: if (log.st_size / 1024) > log_size:
with open(logs_folder + 'latest.log', 'rb') as f_in: with open(logs_folder + "latest.log", "rb") as f_in:
with gzipopen(f'{logs_folder}{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.zip', 'wb') as f_out: with gzipopen(
f'{logs_folder}{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.zip',
"wb",
) as f_out:
copyfileobj(f_in, f_out) copyfileobj(f_in, f_out)
open(logs_folder + 'latest.log', 'w').close() open(logs_folder + "latest.log", "w").close()
i = 2 i = 2
except FileNotFoundError: except FileNotFoundError:
try: try:
log = open(logs_folder + 'latest.log', 'a') log = open(logs_folder + "latest.log", "a")
open(logs_folder + 'latest.log', 'a').close() open(logs_folder + "latest.log", "a").close()
except: except:
try: try:
makedirs(logs_folder, exist_ok=True) makedirs(logs_folder, exist_ok=True)
log = open(logs_folder + 'latest.log', 'a') log = open(logs_folder + "latest.log", "a")
open(logs_folder + 'latest.log', 'a').close() open(logs_folder + "latest.log", "a").close()
except: except:
pass pass
i += 1 i += 1
def appendLog(message): def appendLog(message):
global logs_folder global logs_folder
checkSize() checkSize()
try: try:
log = open(logs_folder + 'latest.log', 'a') log = open(logs_folder + "latest.log", "a")
open(logs_folder + 'latest.log', 'a').close() open(logs_folder + "latest.log", "a").close()
except: except:
try: try:
makedirs(logs_folder, exist_ok=True) makedirs(logs_folder, exist_ok=True)
log = open(logs_folder + 'latest.log', 'a') log = open(logs_folder + "latest.log", "a")
open(logs_folder + 'latest.log', 'a').close() open(logs_folder + "latest.log", "a").close()
except: except:
sleep(2) sleep(2)
print('Log file could not be created') print("Log file could not be created")
return return
print(message, flush=True) print(message, flush=True)
log.write(f'[{datetime.now().strftime("%H:%M:%S | %d.%m.%Y")}] {message}\n') log.write(f'[{datetime.now().strftime("%H:%M:%S | %d.%m.%Y")}] {message}\n')
log.close() log.close()
@ -104,15 +109,17 @@ def userSet(userid, key: str, value):
database[str(userid)][key] = value database[str(userid)][key] = value
jsonSave("data/database.json", database) jsonSave("data/database.json", database)
def userReset(userid, key: str): def userReset(userid, key: str):
database = jsonLoad("data/database.json") database = jsonLoad("data/database.json")
del database[str(userid)][key] del database[str(userid)][key]
jsonSave("data/database.json", database) jsonSave("data/database.json", database)
def userGet(userid, key: str): def userGet(userid, key: str):
try: try:
return jsonLoad("data/database.json")[str(userid)][key] return jsonLoad("data/database.json")[str(userid)][key]
except KeyError: except KeyError:
return None return None
except FileNotFoundError: except FileNotFoundError:
return None return None

View File

@ -1,4 +1,4 @@
#-*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from os import makedirs, path from os import makedirs, path
from subprocess import check_output from subprocess import check_output
@ -9,39 +9,60 @@ from bs4 import BeautifulSoup
config = jsonLoad("config.json") config = jsonLoad("config.json")
class EmptyCardException(Exception): class EmptyCardException(Exception):
pass pass
async def getWaterLeft(cardid, filename, app=None):
url = f"https://bwtaqua.com.ua/card-topup/?id={cardid}"
try:
async def getWaterLeft(cardid, filename, app=None):
url = f"https://bwtaqua.com.ua/card-topup/?id={cardid}"
try:
# if path.exists(f"data/pages/{str(filename)}.html") is False: # if path.exists(f"data/pages/{str(filename)}.html") is False:
# run(["touch", f"data/pages/{str(filename)}.html"]) # run(["touch", f"data/pages/{str(filename)}.html"])
appendLog(f"Trying to get liters for url '{url}'") appendLog(f"Trying to get liters for url '{url}'")
if config["use_compiled_page_saver"] is True: if config["use_compiled_page_saver"] is True:
proc = check_output(["PageSaver/pageSaver", f"https://bwtaqua.com.ua/card-topup/?id={cardid}"]) #, ">", f"data/pages/{str(filename)}.html"]) proc = check_output(
[
"PageSaver/pageSaver",
f"https://bwtaqua.com.ua/card-topup/?id={cardid}",
]
) # , ">", f"data/pages/{str(filename)}.html"])
html_file = proc.decode("utf-8") html_file = proc.decode("utf-8")
else: else:
proc = check_output(["node", "./PageSaver/pageSaver.js", f"https://bwtaqua.com.ua/card-topup/?id={cardid}"]) #, ">", f"data/pages/{str(filename)}.html"]) proc = check_output(
[
"node",
"./PageSaver/pageSaver.js",
f"https://bwtaqua.com.ua/card-topup/?id={cardid}",
]
) # , ">", f"data/pages/{str(filename)}.html"])
html_file = proc.decode("utf-8") html_file = proc.decode("utf-8")
# with open(f'data/pages/{str(filename)}.html') as f: # with open(f'data/pages/{str(filename)}.html') as f:
# html_file = f.read() # html_file = f.read()
# f.close() # f.close()
soup = BeautifulSoup(html_file, 'html.parser')
output = (soup.find_all("h3", class_="headline headline_center headline_pink js-payment-balance")[0].getText()).replace("Твій баланс ", "").replace(" л", "") soup = BeautifulSoup(html_file, "html.parser")
appendLog(f"Parsed {output} liters of water remaining (user: {str(filename)}, cardid: {cardid})") output = (
(
soup.find_all(
"h3",
class_="headline headline_center headline_pink js-payment-balance",
)[0].getText()
)
.replace("Твій баланс ", "")
.replace(" л", "")
)
appendLog(
f"Parsed {output} liters of water remaining (user: {str(filename)}, cardid: {cardid})"
)
except Exception as exp: except Exception as exp:
appendLog(f"Exception occured: {exp} (user: {str(filename)}, cardid: {cardid})") appendLog(f"Exception occured: {exp} (user: {str(filename)}, cardid: {cardid})")
try: try:
@ -52,19 +73,23 @@ async def getWaterLeft(cardid, filename, app=None):
except NameError: except NameError:
tmp_name = "N/A" tmp_name = "N/A"
appendLog(f"'html_file' is not defined so I won't gather any tmp data") appendLog(f"'html_file' is not defined so I won't gather any tmp data")
if app != None: if app != None:
await app.send_message(config["owner_id"], f"**Exception occured:**\n • User: `{str(filename)}`\n • Card: [{cardid}]({url})\n • Exception: `{exp}`\n • TMP UUID: `{tmp_name}`\n • Traceback: `{format_exc()}`", disable_web_page_preview=True) await app.send_message(
config["owner_id"],
f"**Exception occured:**\n • User: `{str(filename)}`\n • Card: [{cardid}]({url})\n • Exception: `{exp}`\n • TMP UUID: `{tmp_name}`\n • Traceback: `{format_exc()}`",
disable_web_page_preview=True,
)
else: else:
appendLog(f'Exception occured and could not send to user: {exp}') appendLog(f"Exception occured and could not send to user: {exp}")
output = "Failure" output = "Failure"
return output return output
if __name__ == "__main__": if __name__ == "__main__":
cardid = input("Enter card number: ") cardid = input("Enter card number: ")
userid = input("Enter Telegram ID (optional): ") userid = input("Enter Telegram ID (optional): ")
print(f"Card has {str(getWaterLeft(cardid, userid, app=None))} l. left") print(f"Card has {str(getWaterLeft(cardid, userid, app=None))} l. left")

View File

@ -1,22 +1,22 @@
RESET = '\u001b[0m' RESET = "\u001b[0m"
BLACK = '\u001b[30m' BLACK = "\u001b[30m"
RED = '\u001b[31m' RED = "\u001b[31m"
GREEN = '\u001b[32m' GREEN = "\u001b[32m"
YELLOW = '\u001b[33m' YELLOW = "\u001b[33m"
BLUE = '\u001b[34m' BLUE = "\u001b[34m"
MAGENTA = '\u001b[35m' MAGENTA = "\u001b[35m"
CYAN = '\u001b[36m' CYAN = "\u001b[36m"
WHITE = '\u001b[37m' WHITE = "\u001b[37m"
BBLACK = '\u001b[30;1m' BBLACK = "\u001b[30;1m"
BRED = '\u001b[31;1m' BRED = "\u001b[31;1m"
BGREEN = '\u001b[32;1m' BGREEN = "\u001b[32;1m"
BYELLOW = '\u001b[33;1m' BYELLOW = "\u001b[33;1m"
BBLUE = '\u001b[34;1m' BBLUE = "\u001b[34;1m"
BMAGENTA = '\u001b[35;1m' BMAGENTA = "\u001b[35;1m"
BCYAN = '\u001b[36;1m' BCYAN = "\u001b[36;1m"
BWHITE = '\u001b[37;1m' BWHITE = "\u001b[37;1m"
ULINE = '\u001b[4m' ULINE = "\u001b[4m"
REVERSE = '\u001b[7m' REVERSE = "\u001b[7m"