Telegram/modules/scheduled.py

178 lines
8.6 KiB
Python

"""Automatically register commands and execute
some scheduled tasks is the main idea of this module"""
from os import listdir, makedirs, path, sep
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta
from app import app
from pyrogram.types import BotCommand, BotCommandScopeChat
from pyrogram.errors import bad_request_400
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from classes.holo_user import HoloUser
from modules.utils import configGet, jsonSave, locale, logWrite
from dateutil.relativedelta import relativedelta
from modules.database import col_applications, col_sponsorships
scheduler = AsyncIOScheduler()
if configGet("enabled", "scheduler", "cache_members"):
@scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_members"))
async def cache_group_members():
list_of_users = []
async for member in app.get_chat_members(configGet("admin", "users")):
list_of_users.append(member.user.id)
makedirs("cache", exist_ok=True)
jsonSave(list_of_users, path.join(configGet("cache", "locations"), "group_members"))
if configGet("enabled", "scheduler", "cache_admins"):
@scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_admins"))
async def cache_admins():
list_of_users = []
async for member in app.get_chat_members(configGet("admin", "groups")):
list_of_users.append(member.user.id)
makedirs("cache", exist_ok=True)
jsonSave(list_of_users, path.join(configGet("cache", "locations"), "admins"))
# Cache the avatars of group members
if configGet("enabled", "scheduler", "cache_avatars"):
@scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=10))
@scheduler.scheduled_job(trigger="interval", hours=configGet("interval", "scheduler", "cache_avatars"))
async def cache_avatars():
list_of_users = []
async for member in app.get_chat_members(configGet("admin", "users"), filter=ChatMembersFilter.SEARCH, query=""):
list_of_users.append(member.user)
for user in list_of_users:
if user.photo != None:
if not path.exists(f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}'):
print(f'Pre-cached avatar {user.photo.big_file_id} of {user.id}', flush=True)
await app.download_media(user.photo.big_file_id, file_name=path.join(configGet("cache", "locations"), "avatars", user.photo.big_file_id))
logWrite("Avatars caching performed")
# Check for birthdays
if configGet("enabled", "features", "applications") is True:
if configGet("enabled", "scheduler", "birthdays") is True:
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "birthdays"))
async def check_birthdays():
for entry in col_applications.find():
if entry["application"]["2"].strftime("%d.%m") == datetime.now().strftime("%d.%m"):
try:
tg_user = await app.get_users(entry["user"])
await app.send_message( configGet("admin", "groups"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), entry["application"]["2"], '%d.%m.%Y').years)) ) # type: ignore
logWrite(f"Notified admins about {entry['user']}'s birthday")
except Exception as exp:
logWrite(f"Could not find user {entry['user']} to send a message about birthday due to '{exp}'")
continue
logWrite("Birthdays check performed")
# Check for expired sponsorships
if configGet("enabled", "features", "sponsorships") is True:
if configGet("enabled", "scheduler", "sponsorships") is True:
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "sponsorships"))
async def check_sponsors():
for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()+timedelta(days=2)}}):
try:
tg_user = await app.get_users(entry["user"])
until_expiry = relativedelta(datetime.now(), entry["sponsorship"]["expires"]).days
await app.send_message( tg_user, locale("sponsorships_expires", "message").format(until_expiry) ) # type: ignore
logWrite(f"Notified user that sponsorship expires in {until_expiry} days")
except Exception as exp:
logWrite(f"Could not find user {entry['user']} notify about sponsorship expiry due to '{exp}'")
continue
for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()}}):
try:
holo_user = HoloUser(entry["user"])
await app.send_message( entry["user"], locale("sponsorships_expired", "message") ) # type: ignore
await holo_user.label_reset(configGet("admin", "users"))
col_sponsorships.find_one_and_delete({"user": holo_user.id})
try:
tg_user = await app.get_users(entry["user"])
logWrite(f"Notified user that sponsorship expired")
except Exception as exp:
logWrite(f"Could not find user {entry['user']} notify about sponsorship expired due to '{exp}'")
except Exception as exp:
logWrite(f"Could not reset label of user {entry['user']} due to '{exp}'")
continue
logWrite("Sponsorships check performed")
# Register all bot commands
@scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=3))
async def commands_register():
commands = {
"users": [],
"admins": [],
"group_users": [],
"group_admins": [],
"locales": {}
}
valid_locales = []
files_locales = listdir(f'{configGet("locale", "locations")}')
for entry in files_locales:
if entry.endswith(".json"):
valid_locales.append(".".join(entry.split(".")[:-1]))
commands["locales"][".".join(entry.split(".")[:-1])] = {
"users": [],
"admins": [],
"group_users": [],
"group_admins": []
}
config_modules = configGet("features")
config_commands = configGet("commands")
for command in config_commands:
enabled = False
for module in config_commands[command]["modules"]:
if config_modules[module]["enabled"] is True:
enabled = True
if enabled is False:
continue
for permission in config_commands[command]["permissions"]:
commands[permission].append(BotCommand(command, locale("commands")[command]))
for lc in valid_locales:
commands["locales"][lc][permission].append(BotCommand(command, locale("commands", locale=lc)[command]))
# Registering user commands
await app.set_bot_commands(commands["users"])
logWrite("Registered user commands for default locale")
# Registering user commands for each locale
for lc in valid_locales:
await app.set_bot_commands(commands["locales"][lc]["users"], language_code=lc)
logWrite(f"Registered user commands for locale {lc}")
# Registering admin/owner commands
for admin in configGet("admins")+[configGet("owner")]:
try:
await app.set_bot_commands(commands["admins"], scope=BotCommandScopeChat(chat_id=admin))
if admin == configGet("owner"):
logWrite(f"Registered admin commands for owner {configGet('owner')}")
else:
logWrite(f"Registered admin commands for admin {admin}")
except bad_request_400.PeerIdInvalid:
pass
# Registering admin group commands
try:
await app.set_bot_commands(commands["group_admins"], scope=BotCommandScopeChat(chat_id=configGet("admin", "groups")))
logWrite("Registered admin group commands for default locale")
except bad_request_400.ChannelInvalid:
logWrite(f"Could not register commands for admin group. Bot is likely not in the group.")
# Registering destination group commands
try:
await app.set_bot_commands(commands["group_users"], scope=BotCommandScopeChat(chat_id=configGet("admin", "users")))
logWrite("Registered destination group commands")
except bad_request_400.ChannelInvalid:
logWrite(f"Could not register commands for destination group. Bot is likely not in the group.")