Merge branch 'master' into dev

This commit is contained in:
Profitroll 2024-03-27 21:23:46 +01:00
commit 7c2e636621
Signed by: profitroll
GPG Key ID: FA35CAB49DACD3B2
36 changed files with 762 additions and 467 deletions

11
.gitignore vendored
View File

@ -152,3 +152,14 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# Custom
config.json
*.session
*.session-journal
venv
venv_linux
venv_windows
.vscode
data/

21
.renovaterc Normal file
View File

@ -0,0 +1,21 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:base"
],
"baseBranches": [
"dev"
],
"packageRules": [
{
"matchUpdateTypes": [
"minor",
"patch",
"pin",
"digest"
],
"automerge": true
}
]
}

View File

@ -1,5 +0,0 @@
{
"dependencies": {
"puppeteer": "^14.4.0"
}
}

Binary file not shown.

View File

@ -1,18 +0,0 @@
// npm install https://github.com/GoogleChrome/puppeteer/
const puppeteer = require('puppeteer');
(async () => {
const url = process.argv[2];
const browser = await puppeteer.launch();
const page = await browser.newPage();
await page.goto(url, {waitUntil: 'load'});
const html = await page.content();
browser.close();
console.log(html);
})();

View File

@ -1,4 +1,5 @@
# BWTAqua
[![License: GPL v3](https://img.shields.io/badge/License-GPL_v3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0.html)
Simple yet helpful bot to check BWT Aqua's card balance
@ -6,23 +7,37 @@ Simple yet helpful bot to check BWT Aqua's card balance
> ⚠️ Please note that this branch is made for testing only so it can sometimes work strange or do not work at all.
## Requirements
* nodejs & npm
* python3
* Python 3.8+
* git
## Installation
1. Download package
1. `git clone https://git.end-play.xyz/profitroll/BWTAqua.git`
2. `cd BWTAqua`
2. Install needed modules:
* `python3 -m pip install -r requirements.txt`
3. Install and configure Selenium Server
4. Configure the bot:
1. `cd ..`
2. `nano config.json` (You can use any other text editor actually, for example `vim`)
5. Run the bot:
* `python3 bwtbot.py`
2. Create venv
1. `python3 -m venv .venv`
2. `source .venv/bin/activate`
3. Install needed modules
* `pip install -r requirements.txt`
4. Configure the bot
* `nano config.json` (You can use any other text editor actually, for example `vim`)
5. Run the bot
* `python main.py`
## Configuration
You can edit with vim, nano, whatever.
If you don't know where to find bot_token and your id - here you can find some hints: [get bot token](https://www.siteguarding.com/en/how-to-get-telegram-bot-api-token), [get your id](https://www.alphr.com/telegram-find-user-id/), [get api_hash and api_id](https://core.telegram.org/api/obtaining_api_id).
If you don't know where to find bot_token and your id - here you can find some hints:
[get bot token](https://www.siteguarding.com/en/how-to-get-telegram-bot-api-token),
[get your id](https://www.alphr.com/telegram-find-user-id/),
[get api_hash and api_id](https://core.telegram.org/api/obtaining_api_id).
## Upgrading from v1.x
If you have just installed your fresh and new v2.x, migrate the database by starting the bot with `--migrate` argument.

181
bwtbot.py
View File

@ -1,181 +0,0 @@
#-*- coding: utf-8 -*-
import traceback
from pyrogram import filters, idle
from pyrogram.types import ForceReply, BotCommand, BotCommandScopeChat
from pyrogram.enums.chat_action import ChatAction
from functions import *
from modules.colors import *
from modules.bwt import *
from modules.app import app
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
import subprocess
import os
config = jsonLoad("config.json")
owner_id = config["owner_id"]
driver = webdriver.Remote(
command_executor=config["selenium_address"],
options=webdriver.FirefoxOptions()
)
wait = WebDriverWait(driver, 10)
# driver.execute_script(f"window.open('about:blank', 'home');")
@app.on_message(~ filters.scheduled & filters.command(["setcard", "задать карту"], prefixes=["/", ""]))
async def setcard(_, msg):
if userGet(msg.from_user.id, "context") is None:
userSet(msg.from_user.id, "context", "set")
await msg.reply_text(string("send_number"), reply_markup=ForceReply(placeholder=string("enter_number")))
else:
await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["cancel", "відміна"], prefixes=["/", ""]))
async def cancel(_, msg):
if userGet(msg.from_user.id, "context") is not None:
userReset(msg.from_user.id, "context")
await msg.reply_text(string("cancel"))
else:
await msg.reply_text(string("cancel_none"))
@app.on_message(~ filters.scheduled & filters.command(["resetcard", "забути картку"], prefixes=["/", ""]))
async def resetcard(_, msg):
if userGet(msg.from_user.id, "context") is None:
if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]:
userReset(msg.from_user.id, "card")
await msg.reply_text(string("card_unlinked"))
appendLog(f"User {str(msg.from_user.id)} reseted his card")
else:
await msg.reply_text(string("card_not_linked").format(string("get_number")))
appendLog(f"User {str(msg.from_user.id)} tried to reset non-existent card")
else:
await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["balance", "баланс"], prefixes=["/", ""]))
async def balance(_, msg):
if userGet(msg.from_user.id, "context") is None:
try:
if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]:
await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING)
water_left = await getWaterLeft(driver, wait, userGet(msg.from_user.id, "card"), msg.from_user.id)
if water_left == "":
raise EmptyCardException("Card information is empty")
elif water_left == "Failure":
await msg.reply_text(string("error_occured").format(string("get_number")))
appendLog(f"User {str(msg.from_user.id)} could not get left water amount")
else:
await msg.reply_text(string("card_balance").format(water_left))
appendLog(f"User {str(msg.from_user.id)} has {water_left} liters remaining")
else:
await msg.reply_text(string("card_not_linked").format(string("get_number")))
appendLog(f"User {str(msg.from_user.id)} tried to get balance without card set")
except Exception as exp:
await msg.reply_text(string("error_occured").format(string("get_number")))
await app.send_message(owner_id, f"Error occured by {str(msg.from_user.id)}:\nException: `{exp}`\nTraceback: `{traceback.format_exc()}`")
appendLog(f"User {str(msg.from_user.id)} could not get left water amount")
else:
await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["topup", "refill", "поповнити"], prefixes=["/", ""]))
async def topup_cmd(_, msg):
if userGet(msg.from_user.id, "context") is None:
try:
if "card" in jsonLoad("data/database.json")[str(msg.from_user.id)]:
await app.send_chat_action(chat_id=msg.chat.id, action=ChatAction.TYPING)
await msg.reply_text(string("top_up").format(str(userGet(msg.from_user.id, "card"))))
appendLog(f"User {str(msg.from_user.id)} requested top up")
else:
await msg.reply_text(string("card_not_linked").format(string("get_number")))
appendLog(f"User {str(msg.from_user.id)} tried to request top up without card set")
except Exception as exp:
await msg.reply_text(exp)
else:
await msg.reply_text(string("cancel_first"))
@app.on_message(~ filters.scheduled & filters.command(["start", "help", "допомога"], prefixes=["/", ""]))
async def help(_, msg):
if userGet(msg.from_user.id, "context") is None:
await msg.reply_text(string("welcome").format(string("get_number")))
if msg.from_user.language_code in jsonLoad("strings.json"):
userSet(msg.from_user.id, "locale", msg.from_user.language_code)
else:
userSet(msg.from_user.id, "locale", "en")
else:
await msg.reply_text(string("cancel_first"))
pid = os.getpid()
@app.on_message(~ filters.scheduled & filters.command(["kill", "die", "shutdown"], prefixes="/"))
async def kill(_, msg):
if msg.from_user.id == owner_id:
await msg.reply_text(f"Shutting down bot with pid **{pid}**")
driver.quit()
os.system(f"kill -9 {pid}")
@app.on_message(~ filters.scheduled)
async def any_message_handler(app, msg):
if userGet(msg.from_user.id, "context") == "set":
userSet(msg.from_user.id, "card", msg.text)
userReset(msg.from_user.id, "context")
appendLog(f"User {str(msg.from_user.id)} set card id to {msg.text}")
await msg.reply_text(string("card_linked").format(msg.text))
print(f'{nowtime()} {WHITE}Starting with PID {YELLOW}{pid}{RESET}')
app.start()
app.send_message(owner_id, f"Starting bot with pid **{pid}**")
app.set_bot_commands([
BotCommand("help", "Меню допомоги"),
BotCommand("balance", "Баланс картки"),
BotCommand("topup", "Поповнити картку"),
BotCommand("setcard", "Прив'язати картку"),
BotCommand("resetcard", "Відв'язати картку"),
BotCommand("cancel", "Відмінити операцію"),
],
language_code="uk")
app.set_bot_commands([
BotCommand("help", "Меню допомоги"),
BotCommand("balance", "Баланс картки"),
BotCommand("topup", "Поповнити картку"),
BotCommand("setcard", "Прив'язати картку"),
BotCommand("resetcard", "Відв'язати картку"),
BotCommand("cancel", "Відмінити операцію"),
],
language_code="ru")
app.set_bot_commands([
BotCommand("help", "Help menu"),
BotCommand("balance", "Card's balance"),
BotCommand("topup", "Refill card"),
BotCommand("setcard", "Link card"),
BotCommand("resetcard", "Unlink card"),
BotCommand("cancel", "Cancel operation"),
])
app.set_bot_commands([
BotCommand("help", "Help menu"),
BotCommand("balance", "Card's balance"),
BotCommand("topup", "Refill card"),
BotCommand("setcard", "Link card"),
BotCommand("resetcard", "Unlink card"),
BotCommand("shutdown", "Turn off the bot"),
BotCommand("cancel", "Cancel operation"),
],
scope=BotCommandScopeChat(chat_id=owner_id))
idle()
driver.quit()
app.send_message(owner_id, f"Shutting down bot with pid **{pid}**")
print(f'\n{nowtime()} {WHITE}Shutting down with PID {YELLOW}{pid}{RESET}')
subprocess.call(f'kill -9 {pid}', shell=True)

28
classes/callbacks.py Normal file
View File

@ -0,0 +1,28 @@
from dataclasses import dataclass
from pyrogram.types import CallbackQuery
@dataclass
class CallbackLanguage:
language: str
@classmethod
def from_callback(cls, callback: CallbackQuery):
"""Parse callback query and extract language data from it.
### Args:
* callback (`CallbackQuery`): Callback query got from user interaction.
### Raises:
* `ValueError`: Raised when callback provided is not a language one.
### Returns:
* `CallbackLanguage`: Parsed callback query.
"""
action, language = str(callback.data).split(":")
if action.lower() != "language":
raise ValueError("Callback provided is not a language callback")
return cls(language)

24
classes/pyroclient.py Normal file
View File

@ -0,0 +1,24 @@
from typing import Union
from libbot.pyrogram.classes import PyroClient as LibPyroClient
from pyrogram.types import User
from classes.pyrouser import PyroUser
class PyroClient(LibPyroClient):
async def find_user(self, user: Union[int, User]) -> PyroUser:
"""Find User by it's ID or User object.
### Args:
* user (`Union[int, User]`): ID or User object to extract ID from.
### Returns:
* `PyroUser`: User in database representation.
"""
return (
await PyroUser.find(user)
if isinstance(user, int)
else await PyroUser.find(user.id, locale=user.language_code)
)

77
classes/pyrouser.py Normal file
View File

@ -0,0 +1,77 @@
import logging
from dataclasses import dataclass
from typing import Union
from modules.database import cursor
logger = logging.getLogger(__name__)
@dataclass
class PyroUser:
"""Dataclass of DB entry of a user"""
__slots__ = ("id", "card", "locale")
id: int
card: Union[str, None]
locale: Union[str, None]
@classmethod
async def find(
cls, id: int, card: Union[str, None] = None, locale: Union[str, None] = None
):
"""Find user in database and create new record if user does not exist.
### Args:
* id (`int`): User's Telegram ID
* card (`Union[str, None]`, *optional*): User's card number. Defaults to `None`.
* locale (`Union[str, None]`, *optional*): User's locale. Defaults to `None`.
### Raises:
* `RuntimeError`: Raised when user entry after insertion could not be found.
### Returns:
* `PyroUser`: User with its database data.
"""
db_entry = cursor.execute(
"SELECT id, card, locale FROM users WHERE id = ?", (id,)
).fetchone()
if db_entry is None:
cursor.execute("INSERT INTO users VALUES (?, ?, ?)", (id, card, locale))
cursor.connection.commit()
db_entry = cursor.execute(
"SELECT id, card, locale FROM users WHERE id = ?", (id,)
).fetchone()
if db_entry is None:
raise RuntimeError("Could not find inserted user entry.")
return cls(*db_entry)
async def update_locale(self, locale: Union[str, None]) -> None:
"""Change user's locale stored in the database.
### Args:
* locale (`Union[str, None]`): New locale to be set.
"""
logger.debug("%s's locale has been set to %s", self.id, locale)
cursor.execute(
"UPDATE users SET locale = ? WHERE id = ?",
(locale, self.id),
)
cursor.connection.commit()
async def update_card(self, card: Union[str, None]) -> None:
"""Change user's card stored in the database.
### Args:
* card (`Union[str, None]`): New card to be set.
"""
logger.debug("%s's card has been set to %s", self.id, card)
cursor.execute(
"UPDATE users SET card = ? WHERE id = ?",
(card, self.id),
)
cursor.connection.commit()

84
commands.json Normal file
View File

@ -0,0 +1,84 @@
{
"help": {
"scopes": [
{
"name": "BotCommandScopeDefault"
},
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"balance": {
"scopes": [
{
"name": "BotCommandScopeDefault"
},
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"topup": {
"scopes": [
{
"name": "BotCommandScopeDefault"
},
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"setcard": {
"scopes": [
{
"name": "BotCommandScopeDefault"
},
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"resetcard": {
"scopes": [
{
"name": "BotCommandScopeDefault"
},
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"language": {
"scopes": [
{
"name": "BotCommandScopeDefault"
},
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"shutdown": {
"scopes": [
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
},
"remove_commands": {
"scopes": [
{
"name": "BotCommandScopeChat",
"chat_id": "owner"
}
]
}
}

View File

@ -1,9 +0,0 @@
{
"owner_id": 0,
"log_size": 1024,
"api_id": 0,
"api_hash": "",
"bot_token": "",
"bot_name": "",
"selenium_address": "http://localhost:4444"
}

15
config_example.json Normal file
View File

@ -0,0 +1,15 @@
{
"locale": "en",
"bot": {
"owner": 0,
"api_id": 0,
"api_hash": "",
"bot_token": "",
"scoped_commands": true
},
"database": "data/database.db",
"reports": {
"chat_id": "owner"
},
"disabled_plugins": []
}

View File

@ -1 +0,0 @@
{}

View File

@ -1,123 +0,0 @@
#-*- coding: utf-8 -*-
import json
import os
import shutil
import gzip
import time
from modules.colors import *
from datetime import datetime
from pathlib import Path
path = Path(__file__).resolve().parent
days_path = str(path)+"/assets/days/"
users_path = str(path)+"/users/"
logs_folder = str(path)+"/logs/"
def jsonSave(filename, value):
with open(filename, 'w', encoding="utf-8") as f:
json.dump(value, f, indent=4, ensure_ascii=False)
f.close()
def jsonLoad(filename):
with open(filename, 'r', encoding="utf-8") as f:
value = json.load(f)
f.close()
return value
config = jsonLoad(f"{path}/config.json")
log_size = config["log_size"]
owner_id = config["owner_id"]
def nowtime():
return f'{BBLACK}[{CYAN}{datetime.now().strftime("%H:%M:%S")}{BBLACK}]{RESET}'
def checkSize():
global logs_folder, log_size
i = 0
while i < 2:
try:
log = os.stat(logs_folder + 'latest.log')
if (log.st_size / 1024) > log_size:
with open(logs_folder + 'latest.log', 'rb') as f_in:
with gzip.open(f'{logs_folder}{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.zip', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
open(logs_folder + 'latest.log', 'w').close()
i = 2
except FileNotFoundError:
try:
log = open(logs_folder + 'latest.log', 'a')
open(logs_folder + 'latest.log', 'a').close()
except:
try:
os.mkdir(logs_folder)
log = open(logs_folder + 'latest.log', 'a')
open(logs_folder + 'latest.log', 'a').close()
except:
pass
i += 1
def appendLog(message):
global logs_folder
checkSize()
try:
log = open(logs_folder + 'latest.log', 'a')
open(logs_folder + 'latest.log', 'a').close()
except:
try:
os.mkdir(logs_folder)
log = open(logs_folder + 'latest.log', 'a')
open(logs_folder + 'latest.log', 'a').close()
except:
time.sleep(2)
print('Log file could not be created')
return
log.write(f'[{datetime.now().strftime("%H:%M:%S | %d.%m.%Y")}] {message}\n')
log.close()
def string(key: str, *args: str, userlocale="uk"):
locales = jsonLoad("strings.json")
strings = locales[userlocale]
string = strings
for dict_key in args:
string = string[dict_key]
return string[key]
def userSet(userid, key: str, value):
database = jsonLoad("data/database.json")
if str(userid) not in database:
database[str(userid)] = {}
database[str(userid)][key] = value
jsonSave("data/database.json", database)
def userReset(userid, key: str):
database = jsonLoad("data/database.json")
del database[str(userid)][key]
jsonSave("data/database.json", database)
def userGet(userid, key: str):
try:
return jsonLoad("data/database.json")[str(userid)][key]
except KeyError:
return None
except FileNotFoundError:
return None

40
locale/en.json Normal file
View File

@ -0,0 +1,40 @@
{
"metadata": {
"flag": "🇬🇧",
"name": "English",
"codes": [
"en",
"en-US",
"en-GB"
]
},
"commands": {
"help": "Help menu",
"balance": "Card's balance",
"topup": "Refill the card",
"setcard": "Link the card",
"resetcard": "Unlink the card",
"language": "Change bot's language",
"shutdown": "Turn the bot off",
"remove_commands": "Unregister all commands"
},
"messages": {
"welcome": "Welcome!\n\nThis bot allows you to get liters left on your personal BWT card.\n\n**Commands:**\n • /balance get card balance\n • /setcard link your card\n • /resetcard unlink your card\n\n{notice}\n\nDeveloper **is not affiliated with BWT Aqua** and this bot is made for personal usage only.",
"cancel": "Operation cancelled",
"card_balance": "Card's balance is {balance} l. of water",
"card_linked": "Linked card: `{card_id}`\n\nPlease, make sure the number is correct before using the bot",
"card_not_linked": "You don't have any linked card.\n\nВYou can set it using /setcard\n\n{notice}",
"card_unlinked": "Card was unlinked from your Telegram",
"card_error": "An error occurred while getting the amount of remaining water on the card.\n\nBWT seems to return empty string to balance requests from bot's server lately, as well as bot cannot use BWT's \"clean\" API to get this data.\n\nTo check your balance you can use official [BWT App](https://bwtaqua.com.ua/en/#app) or simply bookmark this page: {link}.",
"get_number": "**Get card number (Var. 1):**\nOn the front bottom side of your card, number may be found\n\n**Get card number (Var. 2):**\n1. Scan QR on the card\n2. Open webpage from code\n3. Number should be found in **Номер карти \"Здорова Вода\"** or **BWT Aqua card number** fields",
"locale_choice": "Alright. Please choose the language using keyboard below.",
"send_number": "Please, send your card number\nIf you want to abort this operation, use /cancel",
"top_up": "[Click here to top up](https://bwtaqua.com.ua/en/card-topup/?id={card_id})"
},
"callbacks": {
"locale_set": "Your language now is: {locale}"
},
"force_replies": {
"enter_number": "Enter card number"
}
}

39
locale/uk.json Normal file
View File

@ -0,0 +1,39 @@
{
"metadata": {
"flag": "🇺🇦",
"name": "Українська",
"codes": [
"uk",
"uk-UA"
]
},
"commands": {
"help": "Меню допомоги",
"balance": "Баланс картки",
"topup": "Поповнити картку",
"setcard": "Прив'язати картку",
"resetcard": "Відв'язати картку",
"language": "Змінити мову бота",
"shutdown": "Вимкнути бота",
"remove_commands": "Видалити всі команди"
},
"messages": {
"welcome": "Привіт-привіт!\n\nЦей бот дозволяє дізнатись скільки літрів залишилось на вашій карточці.\n\n**Команди:**\n • /balance дізнатись баланс карти\n • /setcard прив'язати карту\n • /resetcard відв'язати карту\n\n{notice}\n\nРозробник **не має жодного відношення до BWT Aqua**, а бот створений лише для особистого, некомерційного використання.",
"cancel": "Операцію скасовано",
"card_balance": "На карточці {balance} л. води",
"card_linked": "Прив'язана карточка: `{card_id}`\n\nБудь ласка, упевніться що номер правильний перед використанням інших команд",
"card_not_linked": "У вас немає прив'язаної картки.\n\nВи можете зробити це за допомогою команди /setcard\n\n{notice}",
"card_unlinked": "Картку відв'язано від вашого Telegram",
"card_error": "При отриманні води на карточці виникла помилка.\n\nОстаннім часом BWT часто повертає нашому серверу порожні строки замість балансу. На жаль, бот не може використовувати \"чисте\" API BWT для отримання даних про баланс, оскільки воно не є публічним.\n\nДля перевірки балансу рекомендуємо користуватись офіційним [додатком BWT](https://bwtaqua.com.ua/#app) або просто додати цю сторінку у закладки: {link}.",
"get_number": "**Дізнатись номер картки (Вар. 1):**\nЗ лицевої сторони картки знизу може бут вказано номер цієї картки\n\n**Дізнатись номер картки (Вар. 2):**\n1. Відсканувати QR код на картці\n2. Відкрити веб-сторінку з кода\n3. Номер буде знаходитись в полі **Номер карти \"Здорова Вода\"** або **Номер карти BWT Aqua**",
"locale_choice": "Гаразд. Будь ласка, оберіть мову за допомогою клавіатури нижче.",
"send_number": "Будь ласка, надішліть номер вашої картки\nЯкщо ви хочете скасувати цю операцію, використовуйте /cancel",
"top_up": "[Натисніть для поповнення](https://bwtaqua.com.ua/card-topup/?id={card_id})"
},
"callbacks": {
"locale_set": "Встановлено мову: {locale}"
},
"force_replies": {
"enter_number": "Введіть номер картки"
}
}

View File

66
main.py Normal file
View File

@ -0,0 +1,66 @@
import contextlib
import logging
from argparse import ArgumentParser
from os import getpid
from pathlib import Path
from convopyro import Conversation
from libbot import sync
from classes.pyroclient import PyroClient
from modules.database import cursor
from modules.migrator import migrate_database
from modules.scheduler import scheduler
logging.basicConfig(
level=logging.INFO,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]",
)
logger = logging.getLogger(__name__)
parser = ArgumentParser(
prog="BWTAqua Bot",
description="Small web scraper for BWT cards' balance parsing",
)
parser.add_argument("--migrate", action="store_true")
args = parser.parse_args()
with contextlib.suppress(ImportError):
import uvloop
uvloop.install()
def main():
client = PyroClient(
scheduler=scheduler, commands_source=sync.json_read(Path("commands.json"))
)
Conversation(client)
if args.migrate:
migrate_database()
elif Path("data/database.json").exists():
logger.info(
"You have an old unmigrated JSON database. Start the bot with --migrate argument to migrate the database to SQLite."
)
try:
client.run()
except KeyboardInterrupt:
logger.warning("Forcefully shutting down with PID %s...", getpid())
finally:
if client.scheduler is not None:
client.scheduler.shutdown()
cursor.close()
cursor.connection.commit()
cursor.connection.close()
exit()
if __name__ == "__main__":
main()

View File

@ -1,63 +0,0 @@
#-*- coding: utf-8 -*-
import os
import traceback
from typing import Union
from selenium import webdriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from functions import *
from modules.app import app
from bs4 import BeautifulSoup
config = jsonLoad("config.json")
class EmptyCardException(Exception):
pass
async def getWaterLeft(driver: webdriver.Remote, wait: WebDriverWait, cardid: Union[str, int], filename: int):
url = f"https://bwtaqua.com.ua/card-topup/?id={cardid}"
try:
# driver.execute_script(f"window.open('about:blank', '{filename}');")
# driver.switch_to.window(f"{filename}")
driver.get(f"https://bwtaqua.com.ua/card-topup/?id={cardid}")
wait.until(EC.presence_of_element_located((By.CLASS_NAME, 'js-payment-balance')))
page_source = driver.page_source
driver.execute_script("window.stop();")
with open(f'data/pages/{filename}.html', "w", encoding="utf-8") as f:
f.write(page_source)
output = driver.find_element(By.CLASS_NAME, "js-payment-balance").text.replace("Твій баланс ", "").replace(" л", "")
# soup = BeautifulSoup(page_source, 'html.parser')
# output = (soup.find_all("h3", class_="headline headline_center headline_pink js-payment-balance")[0].getText()).replace("Твій баланс ", "").replace(" л", "")
appendLog(f"Parsed {output} liters of water remaining (user: {filename}, cardid: {cardid})")
except Exception as exp:
appendLog(f"Exception occured: {exp} (user: {filename}, cardid: {cardid})")
if app != None:
await app.send_message(config["owner_id"], f"**Exception occured:**\n • User: `{filename}`\n • Card: [{cardid}]({url})\n • Exception: `{exp}`\n • Traceback: `{traceback.format_exc()}`", disable_web_page_preview=True)
else:
print(f'Exception occured and could not send to user: {exp}')
output = "Failure"
return output
if __name__ == "__main__":
cardid = input("Enter card number: ")
userid = input("Enter Telegram ID (optional): ")
print(f"Card has {str(getWaterLeft(cardid, userid, app=None))} l. left")

36
modules/bwt_scrape.py Normal file
View File

@ -0,0 +1,36 @@
from typing import Union
from bs4 import BeautifulSoup
from selenium.webdriver import Chrome
from selenium.webdriver.chrome.options import Options
def get_balance(card_id: Union[str, int]) -> Union[str, None]:
chrome_options = Options()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-dev-shm-usage')
driver = Chrome(options=chrome_options)
driver.get(f"https://bwtaqua.com.ua/card-topup/?id={card_id}")
html = driver.page_source
soup = BeautifulSoup(html, "html.parser")
return (
(
soup.find_all(
"h3",
class_="headline headline_center headline_pink js-payment-balance",
)[0].getText()
)
.replace("Твій баланс ", "")
.replace(" л", "")
)
if __name__ == "__main__":
card = input("Type your card ID: ")
print(get_balance(card))

View File

@ -1,22 +0,0 @@
RESET = '\u001b[0m'
BLACK = '\u001b[30m'
RED = '\u001b[31m'
GREEN = '\u001b[32m'
YELLOW = '\u001b[33m'
BLUE = '\u001b[34m'
MAGENTA = '\u001b[35m'
CYAN = '\u001b[36m'
WHITE = '\u001b[37m'
BBLACK = '\u001b[30;1m'
BRED = '\u001b[31;1m'
BGREEN = '\u001b[32;1m'
BYELLOW = '\u001b[33;1m'
BBLUE = '\u001b[34;1m'
BMAGENTA = '\u001b[35;1m'
BCYAN = '\u001b[36;1m'
BWHITE = '\u001b[37;1m'
ULINE = '\u001b[4m'
REVERSE = '\u001b[7m'

11
modules/database.py Normal file
View File

@ -0,0 +1,11 @@
"""Module that provides all database collections"""
import sqlite3
from pathlib import Path
from libbot.sync import config_get
db: sqlite3.Connection = sqlite3.connect(Path(config_get("database")))
cursor: sqlite3.Cursor = db.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER, card TEXT, locale TEXT)")

26
modules/migrator.py Normal file
View File

@ -0,0 +1,26 @@
from os import rename
from pathlib import Path
from typing import Mapping
from libbot.sync import json_read
from modules.database import cursor
def migrate_database() -> None:
"""Apply migrations from old JSON database to SQLite"""
if not Path("data/database.json").exists():
return
db_old: Mapping[str, Mapping[str, str]] = json_read(Path("data/database.json"))
for user, keys in db_old.items():
user_locale = None if "locale" not in keys else keys["locale"]
user_card = None if "card" not in keys else keys["card"]
cursor.execute(
"INSERT INTO users VALUES (?, ?, ?)", (int(user), user_card, user_locale)
)
cursor.connection.commit()
rename(Path("data/database.json"), Path("data/database.migrated.json"))

3
modules/scheduler.py Normal file
View File

@ -0,0 +1,3 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()

View File

@ -0,0 +1,43 @@
import logging
from pyrogram import filters
from pyrogram.enums.chat_action import ChatAction
from pyrogram.types import Message
from classes.pyroclient import PyroClient
from modules.bwt_scrape import get_balance
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["balance"], prefixes=["/"]) # type: ignore
)
async def command_balance(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
if user.card is None:
logger.info("User %s tried to get balance without card set", user.id)
await message.reply_text(
app._("card_not_linked", "messages", locale=user.locale).format(
notice=app._("get_number", "messages", locale=user.locale)
)
)
return
await app.send_chat_action(chat_id=message.chat.id, action=ChatAction.TYPING)
balance = get_balance(user.card)
if balance is None or balance == "":
logger.warning("User %s could not get water balance of their card", user.id)
await message.reply_text(
app._("card_error", "messages", locale=user.locale).format(
link=f"https://bwtaqua.com.ua/card-topup/?id={user.card}"
)
)
return
logger.info("User %s has %s liters on balance", user.id, balance)
await message.reply_text(
app._("card_balance", "messages", locale=user.locale).format(balance=balance)
)

View File

@ -0,0 +1,12 @@
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["remove_commands"], prefixes=["/"]) # type: ignore
)
async def command_remove_commands(app: PyroClient, message: Message):
await message.reply_text("Okay.")
await app.remove_commands(command_sets=await app.collect_commands())

View File

@ -0,0 +1,30 @@
import logging
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled
& filters.command(["resetcard", "забути картку"], prefixes=["/", ""]) # type: ignore
)
async def command_resetcard(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
if user.card is None:
logger.info("User %s tried to reset their card, but it's null", user.id)
await message.reply_text(
app._("card_not_linked", "messages", locale=user.locale).format(
notice=app._("get_number", "messages", locale=user.locale)
)
)
return
await user.update_card(None)
logger.info("User %s has reset their card", user.id)
await message.reply_text(app._("card_unlinked", "messages", locale=user.locale))

View File

@ -0,0 +1,53 @@
import logging
from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled
& filters.command(["setcard", "задати картку"], prefixes=["/", ""]) # type: ignore
)
async def command_setcard(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
app._("send_number", "messages", locale=user.locale),
reply_markup=ForceReply(
placeholder=app._("enter_number", "force_replies", locale=user.locale)
),
)
answer = await listen_message(app, message.chat.id, timeout=500)
if (
answer is None
or answer.text is None
or answer.text.strip()
in [
"/cancel",
"cancel",
"/відміна",
"відміна",
]
):
await message.reply_text(
app._("cancel", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
await user.update_card(answer.text)
logger.info("User %s set their card id to %s", user.id, answer.text)
await message.reply_text(
app._("card_linked", "messages", locale=user.locale).format(
card_id=answer.text
),
reply_markup=ReplyKeyboardRemove(),
)

View File

@ -0,0 +1,15 @@
import asyncio
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled
& filters.command(["shutdown", "reboot", "restart"], prefixes=["/", ""]) # type: ignore
)
async def command_shutdown(app: PyroClient, message: Message):
if message.from_user.id == app.owner:
asyncio.get_event_loop().create_task(app.stop())

17
plugins/commands/start.py Normal file
View File

@ -0,0 +1,17 @@
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["start", "welcome", "help"], prefixes=["/", ""]) # type: ignore
)
async def command_start(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
app._("welcome", "messages", locale=user.locale).format(
notice=app._("get_number", "messages", locale=user.locale)
)
)

30
plugins/commands/topup.py Normal file
View File

@ -0,0 +1,30 @@
import logging
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled
& filters.command(["topup", "refill", "поповнити"], prefixes=["/", ""]) # type: ignore
)
async def command_topup(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
if user.card is None:
logger.info("User %s tried to get card's top-up link, but it's null", user.id)
await message.reply_text(
app._("card_not_linked", "messages", locale=user.locale).format(
notice=app._("get_number", "messages", locale=user.locale)
)
)
return
logger.info("User %s requested top-up link", user.id)
await message.reply_text(
app._("top_up", "messages", locale=user.locale).format(card_id=user.card)
)

45
plugins/language.py Normal file
View File

@ -0,0 +1,45 @@
from typing import List
from pykeyboard import InlineButton, InlineKeyboard
from pyrogram import filters
from pyrogram.types import CallbackQuery, Message
from classes.callbacks import CallbackLanguage
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["language"], prefixes=["/"]) # type: ignore
)
async def command_language(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
keyboard = InlineKeyboard(row_width=2)
buttons: List[InlineButton] = []
for locale, data in app.in_every_locale("metadata").items():
buttons.append(
InlineButton(f"{data['flag']} {data['name']}", f"language:{locale}")
)
keyboard.add(*buttons)
await message.reply_text(
app._("locale_choice", "messages", locale=user.locale),
reply_markup=keyboard,
)
@PyroClient.on_callback_query(filters.regex(r"language:[\s\S]*")) # type: ignore
async def callback_language(app: PyroClient, callback: CallbackQuery):
user = await app.find_user(callback.from_user)
parsed = CallbackLanguage.from_callback(callback)
await user.update_locale(parsed.language)
await callback.answer(
app._("locale_set", "callbacks", locale=parsed.language).format(
locale=app._("name", "metadata", locale=parsed.language)
),
show_alert=True,
)

View File

@ -1,2 +1,10 @@
beautifulsoup4==4.11.1
selenium==4.7.2
apscheduler~=3.10.4
beautifulsoup4~=4.12.2
convopyro==0.5
pykeyboard==0.1.7
requests-html==0.10.0
selenium~=4.19.0
tgcrypto==1.2.5
uvloop==0.19.0
--extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
libbot[speed,pyrogram]==3.0.0

View File

@ -1,32 +0,0 @@
{
"en": {
"welcome": "Welcome!\n\nThis bot allows you to get liters left on your personal BWT card.\n\n**Commands:**\n • /balance get card balance\n • /setcard link your card\n • /resetcard unlink your card\n\n{0}\n\nDeveloper **is not affiliated with BWT Aqua** and this bot is made for personal usage only.",
"get_number": "**Get card number (Var. 1):**\nOn the front bottom side of your card, number may be found\n\n**Get card number (Var. 2):**\n1. Scan QR on the card\n2. Open webpage from code\n3. Numer should be found in **Номер карти \"Здорова Вода\"** or **Номер карти BWT Aqua** fields",
"card_linked": "Linked card: `{0}`\n\nPlease, make sure the number is correct before using the bot",
"card_unlinked": "Card was unlinked from your Telegram",
"card_not_linked": "You don't have any linked card.\n\nВы можете задать её с помощью команды /setcard\n\n{0}",
"error_occured": "An error occurred while getting the amount of remaining water on the card.\n\nPlease make sure the linked card number is correct. If you are sure that the bot is broken, please contact @profitroll.\n\nLink your card: /setcard\n\n{0}",
"card_balance": "Card's balance is {0} l. of water",
"top_up": "[Click here to top up](https://bwtaqua.com.ua/card-topup/?id={0})",
"cancel": "Operation cancelled",
"cancel_none": "Nothing to cancel",
"cancel_first": "Operation ongoing. Cancel the current one using /cancel to run this action",
"enter_number": "Enter card number",
"send_number": "Please, send your card number"
},
"uk": {
"welcome": "Привіт-привіт!\n\nЦей бот дозволяє дізнатись скільки літрів залишилось на вашій карточці.\n\n**Команди:**\n • /balance дізнатись баланс карти\n • /setcard приав'язати карту\n • /resetcard відв'язати карту\n\n{0}\n\nРозробник **не має жодного відношення до BWT Aqua**, а бот створений лише для особистого, некомерційного використання.",
"get_number": "**Дізнатись номер картки (Вар. 1):**\nЗ лицевої сторони картки знизу може бут вказано номер цієї картки\n\n**Дізнатись номер картки (Вар. 2):**\n1. Отсканувати QR код на картці\n2. Відкрити веб-сторінку з кода\n3. Номер буде знаходитись в полі **Номер карти \"Здорова Вода\"** або **Номер карти BWT Aqua**",
"card_linked": "Прив'язана карточка: `{0}`\n\nБудь ласка, упевніться що номер правильний перед використанням інших команд",
"card_unlinked": "Картку відв'язано від вашого Telegram",
"card_not_linked": "У вас немає прив'язаної картки.\n\nВи можете зробити це за допомогою команди /setcard\n\n{0}",
"error_occured": "При отриманні води на карточці виникла помилка.\n\nБудь ласка, упевніться що номер карти правильний. Якщо ви впевнені, що номер картки правильний та бот зламався зв'яжіться з @profitroll.\n\nПрив'язати карту: /setcard\n\n{0}",
"card_balance": "На карточці {0} л. води",
"top_up": "[Натисніть для поповнення](https://bwtaqua.com.ua/card-topup/?id={0})",
"cancel": "Операція відмінена",
"cancel_none": "Нема що відміняти",
"cancel_first": "Триває інша операція. Відмініть триваючу операцію командою /cancel щоб запустити іншу дію",
"enter_number": "Введіть номер картки",
"send_number": "Будь ласка, надішліть номер вашої картки"
}
}