Second commit kind of
This commit is contained in:
parent
43bc98c6ef
commit
ce37099e2d
2
.gitignore
vendored
2
.gitignore
vendored
@ -152,3 +152,5 @@ cython_debug/
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
# Custom
|
||||
config.json
|
20
TASK.md
Normal file
20
TASK.md
Normal file
@ -0,0 +1,20 @@
|
||||
# ТЗ
|
||||
|
||||
## Algorithm
|
||||
|
||||
1. Лінка веде на бота.
|
||||
2. Перший пост від бота: вітання та питання, чи хоче людина доєднатися до українського ком'юніті фанатів Хололайва
|
||||
3. Дві кнопки: "Так-Ні"
|
||||
3а. Якщо "Ні", то бот вибачається та каже, що, коли захоче, то людина фрі ту джоін. Під цим кнопка "Я передумав, я хочу"
|
||||
4. Якщо "Так", бот кидає анкету та просить заповнити за пунктами. Після цього наступна відповідь людини пересилається в чат ХолоКиїв (лінку дамо тобі)
|
||||
5. З адмінської сторони має бути доступ до бота з окремим адмін-доступом, де можна для кожного юзера, який відправив анкету ботові, натиснути кнопку апрув або дінай
|
||||
5а. Якщо дінай, бот вибачається та каже повертатися, коли буде бажання втягнутися
|
||||
6. Якщо апрув, бот кидає лінку до чату
|
||||
|
||||
Можно кидать не просто "заполните анкету", а что бы бот поочерёдно задавал вопросы. И после каждого ответа, он их сохранял в отдельное облако ответов
|
||||
|
||||
генерувати одноразову лінку
|
||||
|
||||
Для полного наворота, если позволят навыки, то можно определёнными командами, без поиска по хештегу, сразу смотреть анкеты нужных участников в боте (либо в самом чате)
|
||||
|
||||
перевірка віку
|
28
config_example.json
Normal file
28
config_example.json
Normal file
@ -0,0 +1,28 @@
|
||||
{
|
||||
"owner": 0,
|
||||
"admins": [],
|
||||
"bot": {
|
||||
"api_id": 0,
|
||||
"api_hash": "",
|
||||
"bot_token": ""
|
||||
},
|
||||
"logging": {
|
||||
"size": 512,
|
||||
"location": "logs"
|
||||
},
|
||||
"locations": {
|
||||
"data": "data",
|
||||
"sent": "data/sent",
|
||||
"queue": "data/queue",
|
||||
"index": "data/index.json",
|
||||
"submit": "data/submit.json",
|
||||
"blocked": "data/blocked.json",
|
||||
"locale": "locale"
|
||||
},
|
||||
"commands": {
|
||||
"start": "Start using the bot"
|
||||
},
|
||||
"commands_admin": {
|
||||
"reboot": "Restart the bot"
|
||||
}
|
||||
}
|
0
data/.gitkeep
Normal file
0
data/.gitkeep
Normal file
0
data/users/.gitkeep
Normal file
0
data/users/.gitkeep
Normal file
BIN
holochecker.session
Normal file
BIN
holochecker.session
Normal file
Binary file not shown.
BIN
holochecker.session-journal
Normal file
BIN
holochecker.session-journal
Normal file
Binary file not shown.
62
main.py
Normal file
62
main.py
Normal file
@ -0,0 +1,62 @@
|
||||
import asyncio
|
||||
from os import getpid, listdir
|
||||
from modules.utils import *
|
||||
|
||||
from pyrogram.client import Client
|
||||
from pyrogram import filters
|
||||
from pyrogram.types import InlineKeyboardMarkup, InlineKeyboardButton, BotCommand, BotCommandScopeChat
|
||||
from pyrogram import idle # type: ignore
|
||||
from pyrogram.errors.exceptions import bad_request_400
|
||||
|
||||
pid = getpid()
|
||||
|
||||
app = Client("holochecker", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot"))
|
||||
|
||||
@app.on_message(~ filters.scheduled & filters.command(["kill", "die", "reboot"], prefixes=["", "/"]))
|
||||
async def cmd_kill(app, msg):
|
||||
|
||||
if (msg.from_user.id == configGet("owner")) or (msg.from_user.id in configGet("admins")):
|
||||
logWrite(f"Shutting down bot with pid {pid}")
|
||||
await msg.reply_text(f"Вимкнення бота з підом `{pid}`")
|
||||
killProc(pid)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
logWrite(f"Starting up with pid {pid}")
|
||||
|
||||
# Yes, it should be in some kind of async main() function but I don't give a shit.
|
||||
# I did compare performance and it's much more useful this way. Change my mind.
|
||||
app.start() # type: ignore
|
||||
|
||||
app.send_message(configGet("owner"), f"Starting up with pid `{pid}`") # type: ignore
|
||||
|
||||
# Registering user commands for fallback locale
|
||||
commands_list = []
|
||||
for command in configGet("commands"):
|
||||
commands_list.append(BotCommand(command, configGet("commands")[command]))
|
||||
app.set_bot_commands(commands_list) # type: ignore
|
||||
|
||||
# Registering admin commands
|
||||
commands_admin_list = []
|
||||
for command in configGet("commands"):
|
||||
commands_admin_list.append(BotCommand(command, configGet("commands")[command]))
|
||||
|
||||
for command in configGet("commands_admin"):
|
||||
commands_admin_list.append(BotCommand(command, configGet("commands_admin")[command]))
|
||||
|
||||
for admin in configGet("admins"):
|
||||
try:
|
||||
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) # type: ignore
|
||||
except bad_request_400.PeerIdInvalid:
|
||||
pass
|
||||
|
||||
app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) # type: ignore
|
||||
|
||||
idle()
|
||||
|
||||
app.send_message(configGet("owner"), f"Shutting with pid `{pid}`") # type: ignore
|
||||
|
||||
app.stop() # type: ignore
|
||||
|
||||
killProc(pid)
|
58
modules/logging.py
Normal file
58
modules/logging.py
Normal file
@ -0,0 +1,58 @@
|
||||
from ujson import loads
|
||||
from os import stat, makedirs, path, getcwd
|
||||
from gzip import open as gzipopen
|
||||
from shutil import copyfileobj
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
with open(getcwd()+path.sep+"config.json", "r", encoding='utf8') as file:
|
||||
json_contents = loads(file.read())
|
||||
log_size = json_contents["logging"]["size"]
|
||||
log_folder = json_contents["logging"]["location"]
|
||||
file.close()
|
||||
|
||||
# Check latest log size
|
||||
def checkSize(debug=False):
|
||||
|
||||
global log_folder
|
||||
|
||||
if debug:
|
||||
log_file = "debug.log"
|
||||
else:
|
||||
log_file = "latest.log"
|
||||
|
||||
try:
|
||||
makedirs(log_folder, exist_ok=True)
|
||||
log = stat(path.join(log_folder, log_file))
|
||||
if (log.st_size / 1024) > log_size:
|
||||
with open(path.join(log_folder, log_file), 'rb') as f_in:
|
||||
with gzipopen(path.join(log_folder, f'{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz'), 'wb') as f_out:
|
||||
copyfileobj(f_in, f_out)
|
||||
print(f'Copied {path.join(log_folder, datetime.now().strftime("%d.%m.%Y_%H:%M:%S"))}.log.gz')
|
||||
open(path.join(log_folder, log_file), 'w').close()
|
||||
except FileNotFoundError:
|
||||
print(f'Log file {path.join(log_folder, log_file)} does not exist')
|
||||
pass
|
||||
|
||||
# Append string to log
|
||||
def logAppend(message, debug=False):
|
||||
|
||||
global log_folder
|
||||
|
||||
message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] {message}'
|
||||
checkSize(debug=debug)
|
||||
|
||||
if debug:
|
||||
log_file = "debug.log"
|
||||
else:
|
||||
log_file = "latest.log"
|
||||
|
||||
log = open(path.join(log_folder, log_file), 'a')
|
||||
log.write(f'{message_formatted}\n')
|
||||
log.close()
|
||||
|
||||
# Print to stdout and then to log
|
||||
def logWrite(message, debug=False):
|
||||
# save to log file and rotation is to be done
|
||||
logAppend(f'{message}', debug=debug)
|
||||
print(f"{message}", flush=True)
|
114
modules/utils.py
Normal file
114
modules/utils.py
Normal file
@ -0,0 +1,114 @@
|
||||
from ujson import JSONDecodeError as JSONDecodeError
|
||||
from ujson import loads, dumps
|
||||
|
||||
from sys import exit
|
||||
from os import kill
|
||||
from os import name as osname
|
||||
from traceback import print_exc
|
||||
|
||||
from modules.logging import logWrite
|
||||
|
||||
def jsonLoad(filename):
|
||||
"""Loads arg1 as json and returns its contents"""
|
||||
with open(filename, "r", encoding='utf8') as file:
|
||||
try:
|
||||
output = loads(file.read())
|
||||
except JSONDecodeError:
|
||||
logWrite(f"Could not load json file {filename}: file seems to be incorrect!\n{print_exc()}")
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
logWrite(f"Could not load json file {filename}: file does not seem to exist!\n{print_exc()}")
|
||||
raise
|
||||
file.close()
|
||||
return output
|
||||
|
||||
def jsonSave(contents, filename):
|
||||
"""Dumps dict/list arg1 to file arg2"""
|
||||
try:
|
||||
with open(filename, "w", encoding='utf8') as file:
|
||||
file.write(dumps(contents, ensure_ascii=False, indent=4))
|
||||
file.close()
|
||||
except Exception as exp:
|
||||
logWrite(f"Could not save json file {filename}: {exp}\n{print_exc()}")
|
||||
return
|
||||
|
||||
|
||||
def configSet(key: str, value, *args: str):
|
||||
"""Set key to a value
|
||||
Args:
|
||||
* key (str): The last key of the keys path.
|
||||
* value (str/int/float/list/dict/None): Some needed value.
|
||||
* *args (str): Path to key like: dict[args][key].
|
||||
"""
|
||||
this_dict = jsonLoad("config.json")
|
||||
string = "this_dict"
|
||||
for arg in args:
|
||||
string += f'["{arg}"]'
|
||||
if type(value) in [str]:
|
||||
string += f'["{key}"] = "{value}"'
|
||||
else:
|
||||
string += f'["{key}"] = {value}'
|
||||
exec(string)
|
||||
jsonSave(this_dict, "config.json")
|
||||
return
|
||||
|
||||
def configGet(key: str, *args: str):
|
||||
"""Get value of the config key
|
||||
Args:
|
||||
* key (str): The last key of the keys path.
|
||||
* *args (str): Path to key like: dict[args][key].
|
||||
Returns:
|
||||
* any: Value of provided key
|
||||
"""
|
||||
this_dict = jsonLoad("config.json")
|
||||
this_key = this_dict
|
||||
for dict_key in args:
|
||||
this_key = this_key[dict_key]
|
||||
return this_key[key]
|
||||
|
||||
# def locale(key: str, *args: str, locale=configGet("locale")):
|
||||
# """Get value of locale string
|
||||
# Args:
|
||||
# * key (str): The last key of the locale's keys path.
|
||||
# * *args (list): Path to key like: dict[args][key].
|
||||
# * locale (str): Locale to looked up in. Defaults to config's locale value.
|
||||
# Returns:
|
||||
# * any: Value of provided locale key
|
||||
# """
|
||||
# if (locale == None):
|
||||
# locale = configGet("locale")
|
||||
|
||||
# try:
|
||||
# this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{locale}.json')
|
||||
# except FileNotFoundError:
|
||||
# try:
|
||||
# this_dict = jsonLoad(f'{configGet("locale", "locations")}{sep}{configGet("locale")}.json')
|
||||
# except FileNotFoundError:
|
||||
# try:
|
||||
# this_dict = jsonLoad(f'{configGet("locale_fallback", "locations")}{sep}{configGet("locale")}.json')
|
||||
# except:
|
||||
# return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
|
||||
|
||||
# this_key = this_dict
|
||||
# for dict_key in args:
|
||||
# this_key = this_key[dict_key]
|
||||
|
||||
# try:
|
||||
# return this_key[key]
|
||||
# except KeyError:
|
||||
# return f'⚠️ Locale in config is invalid: could not get "{key}" in {str(args)} from locale "{locale}"'
|
||||
|
||||
try:
|
||||
from psutil import Process
|
||||
except ModuleNotFoundError:
|
||||
# print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
|
||||
print("Missing dependencies! Please install all needed dependencies and run the bot again!")
|
||||
exit()
|
||||
|
||||
def killProc(pid):
|
||||
if osname == "posix":
|
||||
from signal import SIGKILL # type: ignore
|
||||
kill(pid, SIGKILL)
|
||||
else:
|
||||
p = Process(pid)
|
||||
p.kill()
|
4
requirements.txt
Normal file
4
requirements.txt
Normal file
@ -0,0 +1,4 @@
|
||||
pyrogram>=2.0.59
|
||||
tgcrypto>=1.2.4
|
||||
ujson>=5.5.0
|
||||
psutil>=5.9.2
|
Reference in New Issue
Block a user