Big and tasty update to v2.0
This commit is contained in:
103
modules/bwt.py
103
modules/bwt.py
@@ -1,103 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import logging
|
||||
from os import makedirs, path
|
||||
from subprocess import check_output
|
||||
from traceback import format_exc
|
||||
from uuid import uuid4
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from libbot import config_get
|
||||
|
||||
from modules.utils import *
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EmptyCardException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
async def getWaterLeft(cardid, filename, app=None):
|
||||
url = f"https://bwtaqua.com.ua/card-topup/?id={cardid}"
|
||||
|
||||
try:
|
||||
# if path.exists(f"data/pages/{str(filename)}.html") is False:
|
||||
# run(["touch", f"data/pages/{str(filename)}.html"])
|
||||
|
||||
logger.info(f"Trying to get liters for url '{url}'")
|
||||
|
||||
if await config_get("use_compiled_page_saver") is True:
|
||||
proc = check_output(
|
||||
[
|
||||
"PageSaver/pageSaver",
|
||||
f"https://bwtaqua.com.ua/card-topup/?id={cardid}",
|
||||
"./bin/chrome",
|
||||
]
|
||||
) # , ">", f"data/pages/{str(filename)}.html"])
|
||||
html_file = proc.decode("utf-8")
|
||||
else:
|
||||
proc = check_output(
|
||||
[
|
||||
"node",
|
||||
"./PageSaver/pageSaver.js",
|
||||
f"https://bwtaqua.com.ua/card-topup/?id={cardid}",
|
||||
"./bin/chrome",
|
||||
]
|
||||
) # , ">", f"data/pages/{str(filename)}.html"])
|
||||
html_file = proc.decode("utf-8")
|
||||
|
||||
# with open(f'data/pages/{str(filename)}.html') as f:
|
||||
# html_file = f.read()
|
||||
# f.close()
|
||||
|
||||
soup = BeautifulSoup(html_file, "html.parser")
|
||||
|
||||
output = (
|
||||
(
|
||||
soup.find_all(
|
||||
"h3",
|
||||
class_="headline headline_center headline_pink js-payment-balance",
|
||||
)[0].getText()
|
||||
)
|
||||
.replace("Твій баланс ", "")
|
||||
.replace(" л", "")
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Parsed {output} liters of water remaining (user: {str(filename)}, cardid: {cardid})"
|
||||
)
|
||||
|
||||
except Exception as exp:
|
||||
logger.exception(
|
||||
f"Exception occured: {exp} (user: {str(filename)}, cardid: {cardid})", exp
|
||||
)
|
||||
|
||||
try:
|
||||
tmp_name = str(uuid4())
|
||||
makedirs("tmp", exist_ok=True)
|
||||
with open(path.join("tmp", tmp_name), "w", encoding="utf-8") as f:
|
||||
f.write(html_file)
|
||||
except NameError:
|
||||
tmp_name = "N/A"
|
||||
logger.warning(f"'html_file' is not defined so I won't gather any tmp data")
|
||||
|
||||
if app != None:
|
||||
await app.send_message(
|
||||
app.owner,
|
||||
f"**Exception occured:**\n • User: `{str(filename)}`\n • Card: [{cardid}]({url})\n • Exception: `{exp}`\n • TMP UUID: `{tmp_name}`\n • Traceback: `{format_exc()}`",
|
||||
disable_web_page_preview=True,
|
||||
)
|
||||
else:
|
||||
logger.warning(f"Exception occurred and could not send to user: {exp}")
|
||||
|
||||
output = "Failure"
|
||||
|
||||
return output
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cardid = input("Enter card number: ")
|
||||
userid = input("Enter Telegram ID (optional): ")
|
||||
|
||||
print(f"Card has {str(getWaterLeft(cardid, userid, app=None))} l. left")
|
30
modules/bwt_scrape.py
Normal file
30
modules/bwt_scrape.py
Normal file
@@ -0,0 +1,30 @@
|
||||
from typing import Union
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from selenium import webdriver
|
||||
|
||||
|
||||
def get_balance(card_id: Union[str, int]) -> Union[str, None]:
|
||||
driver = webdriver.Chrome()
|
||||
|
||||
driver.get(f"https://bwtaqua.com.ua/card-topup/?id={card_id}")
|
||||
|
||||
html = driver.page_source
|
||||
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
|
||||
return (
|
||||
(
|
||||
soup.find_all(
|
||||
"h3",
|
||||
class_="headline headline_center headline_pink js-payment-balance",
|
||||
)[0].getText()
|
||||
)
|
||||
.replace("Твій баланс ", "")
|
||||
.replace(" л", "")
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
card = input("Type your card ID: ")
|
||||
print(get_balance(card))
|
11
modules/database.py
Normal file
11
modules/database.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""Module that provides all database collections"""
|
||||
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
|
||||
from libbot.sync import config_get
|
||||
|
||||
db: sqlite3.Connection = sqlite3.connect(Path(config_get("database")))
|
||||
cursor: sqlite3.Cursor = db.cursor()
|
||||
|
||||
cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER, card TEXT, locale TEXT)")
|
26
modules/migrator.py
Normal file
26
modules/migrator.py
Normal file
@@ -0,0 +1,26 @@
|
||||
from os import rename
|
||||
from pathlib import Path
|
||||
from typing import Mapping
|
||||
|
||||
from libbot.sync import json_read
|
||||
|
||||
from modules.database import cursor
|
||||
|
||||
|
||||
def migrate_database() -> None:
|
||||
"""Apply migrations from old JSON database to SQLite"""
|
||||
if not Path("data/database.json").exists():
|
||||
return
|
||||
|
||||
db_old: Mapping[str, Mapping[str, str]] = json_read(Path("data/database.json"))
|
||||
|
||||
for user, keys in db_old.items():
|
||||
user_locale = None if "locale" not in keys else keys["locale"]
|
||||
user_card = None if "card" not in keys else keys["card"]
|
||||
|
||||
cursor.execute(
|
||||
"INSERT INTO users VALUES (?, ?, ?)", (int(user), user_card, user_locale)
|
||||
)
|
||||
|
||||
cursor.connection.commit()
|
||||
rename(Path("data/database.json"), Path("data/database.migrated.json"))
|
3
modules/scheduler.py
Normal file
3
modules/scheduler.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
scheduler = AsyncIOScheduler()
|
@@ -1,33 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from libbot import json_read, sync
|
||||
|
||||
|
||||
async def string(key: str, *args: str, userlocale="uk"):
|
||||
locales = await json_read("strings.json")
|
||||
strings = locales[userlocale]
|
||||
string = strings
|
||||
for dict_key in args:
|
||||
string = string[dict_key]
|
||||
return string[key]
|
||||
|
||||
|
||||
def userSet(userid, key: str, value):
|
||||
database = sync.json_read("data/database.json")
|
||||
if str(userid) not in database:
|
||||
database[str(userid)] = {}
|
||||
database[str(userid)][key] = value
|
||||
sync.json_write(database, "data/database.json")
|
||||
|
||||
|
||||
def userReset(userid, key: str):
|
||||
database = sync.json_read("data/database.json")
|
||||
del database[str(userid)][key]
|
||||
sync.json_write(database, "data/database.json")
|
||||
|
||||
|
||||
def userGet(userid, key: str):
|
||||
try:
|
||||
return sync.json_read("data/database.json")[str(userid)][key]
|
||||
except (KeyError, FileNotFoundError):
|
||||
return None
|
||||
|
Reference in New Issue
Block a user