"""Automatically register commands and execute some scheduled tasks is the main idea of this module""" from os import listdir, makedirs, path, sep from apscheduler.schedulers.asyncio import AsyncIOScheduler from datetime import datetime, timedelta from ujson import dumps from app import app from pyrogram.types import BotCommand, BotCommandScopeChat, BotCommandScopeChatAdministrators from pyrogram.errors import bad_request_400 from pyrogram.enums.chat_members_filter import ChatMembersFilter from classes.holo_user import HoloUser from modules.utils import configGet, jsonSave, locale, logWrite from dateutil.relativedelta import relativedelta from modules.database import col_applications, col_sponsorships scheduler = AsyncIOScheduler() if configGet("enabled", "scheduler", "cache_members"): @scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_members")) async def cache_group_members(): list_of_users = [] async for member in app.get_chat_members(configGet("users", "groups")): list_of_users.append(member.user.id) makedirs(configGet("cache", "locations"), exist_ok=True) jsonSave(list_of_users, path.join(configGet("cache", "locations"), "group_members")) logWrite("User group caching performed", debug=True) if configGet("enabled", "scheduler", "cache_admins"): @scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_admins")) async def cache_admins(): list_of_users = [] async for member in app.get_chat_members(configGet("admin", "groups")): list_of_users.append(member.user.id) makedirs(configGet("cache", "locations"), exist_ok=True) jsonSave(list_of_users, path.join(configGet("cache", "locations"), "admins")) logWrite("Admin group caching performed", debug=True) # Cache the avatars of group members if configGet("enabled", "scheduler", "cache_avatars"): @scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=10)) @scheduler.scheduled_job(trigger="interval", hours=configGet("interval", "scheduler", "cache_avatars")) async def cache_avatars(): list_of_users = [] async for member in app.get_chat_members(configGet("users", "groups"), filter=ChatMembersFilter.SEARCH, query=""): list_of_users.append(member.user) for user in list_of_users: if user.photo != None: if not path.exists(f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}'): print(f'Pre-cached avatar {user.photo.big_file_id} of {user.id}', flush=True) await app.download_media(user.photo.big_file_id, file_name=path.join(configGet("cache", "locations"), "avatars", user.photo.big_file_id)) logWrite("Avatars caching performed") # Check for birthdays if configGet("enabled", "features", "applications") is True: if configGet("enabled", "scheduler", "birthdays") is True: @scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "birthdays")) async def check_birthdays(): for entry in col_applications.find(): if entry["application"]["2"].strftime("%d.%m") == datetime.now().strftime("%d.%m"): try: tg_user = await app.get_users(entry["user"]) await app.send_message( configGet("admin", "groups"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), entry["application"]["2"], '%d.%m.%Y').years)) ) # type: ignore logWrite(f"Notified admins about {entry['user']}'s birthday") except Exception as exp: logWrite(f"Could not find user {entry['user']} to send a message about birthday due to '{exp}'") continue logWrite("Birthdays check performed") # Check for expired sponsorships if configGet("enabled", "features", "sponsorships") is True: if configGet("enabled", "scheduler", "sponsorships") is True: @scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "sponsorships")) async def check_sponsors(): for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()+timedelta(days=2)}}): try: tg_user = await app.get_users(entry["user"]) until_expiry = relativedelta(datetime.now(), entry["sponsorship"]["expires"]).days await app.send_message( tg_user.id, locale("sponsorships_expires", "message").format(until_expiry) ) # type: ignore logWrite(f"Notified user that sponsorship expires in {until_expiry} days") except Exception as exp: logWrite(f"Could not find user {entry['user']} notify about sponsorship expiry due to '{exp}'") continue for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()}}): try: holo_user = HoloUser(entry["user"]) await app.send_message( entry["user"], locale("sponsorships_expired", "message") ) # type: ignore await holo_user.label_reset(configGet("users", "groups")) col_sponsorships.find_one_and_delete({"user": holo_user.id}) try: tg_user = await app.get_users(entry["user"]) logWrite(f"Notified user that sponsorship expired") except Exception as exp: logWrite(f"Could not find user {entry['user']} notify about sponsorship expired due to '{exp}'") except Exception as exp: logWrite(f"Could not reset label of user {entry['user']} due to '{exp}'") continue logWrite("Sponsorships check performed") # Register all bot commands @scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=3)) async def commands_register(): commands = { "users": [], "admins": [], "owner": [], "group_users": [], "group_admins": [], "group_users_admins": [], "locales": {} } commands_raw = { "users": [], "admins": [], "owner": [], "group_users": [], "group_admins": [], "group_users_admins": [], "locales": {} } valid_locales = [] files_locales = listdir(f'{configGet("locale", "locations")}') for entry in files_locales: if entry.endswith(".json"): valid_locales.append(".".join(entry.split(".")[:-1])) commands["locales"][".".join(entry.split(".")[:-1])] = { "users": [], "admins": [], "owner": [], "group_users": [], "group_admins": [], "group_users_admins": [] } if configGet("debug") is True: commands_raw["locales"][".".join(entry.split(".")[:-1])] = { "users": [], "admins": [], "owner": [], "group_users": [], "group_admins": [], "group_users_admins": [] } config_modules = configGet("features") config_commands = configGet("commands") for command in config_commands: enabled = False for module in config_commands[command]["modules"]: if config_modules[module]["enabled"] is True: enabled = True if enabled is False: logWrite(f"Not registering {command} at all", debug=True) continue for permission in config_commands[command]["permissions"]: commands[permission].append(BotCommand(command, locale("commands")[command])) if configGet("debug") is True: commands_raw[permission].append({f"{command}": locale("commands")[command]}) logWrite(f"Registering {command} for {permission}") for lc in valid_locales: commands["locales"][lc][permission].append(BotCommand(command, locale("commands", locale=lc)[command])) if configGet("debug") is True: commands_raw["locales"][lc][permission].append({f"{command}": locale("commands", locale=lc)[command]}) logWrite(f"Registering {command} for {permission} [{lc}]") # Registering user commands await app.set_bot_commands(commands["users"]) logWrite("Registered user commands for default locale") # Registering user commands for each locale for lc in valid_locales: await app.set_bot_commands(commands["locales"][lc]["users"], language_code=lc) logWrite(f"Registered user commands for locale {lc}") # Registering admin commands for admin in configGet("admins"): try: await app.set_bot_commands(commands["admins"]+commands["users"], scope=BotCommandScopeChat(chat_id=admin)) logWrite(f"Registered admin commands for admin {admin}") except bad_request_400.PeerIdInvalid: pass # Registering owner commands try: await app.set_bot_commands(commands["admins"]+commands["owner"]+commands["users"], scope=BotCommandScopeChat(chat_id=configGet("owner"))) for lc in valid_locales: await app.set_bot_commands(commands["locales"][lc]["admins"]+commands["locales"][lc]["owner"]+commands["locales"][lc]["users"], scope=BotCommandScopeChat(chat_id=configGet("owner"))) logWrite(f"Registered admin commands for owner {configGet('owner')}") except bad_request_400.PeerIdInvalid: pass # Registering admin group commands try: await app.set_bot_commands(commands["group_admins"], scope=BotCommandScopeChat(chat_id=configGet("admin", "groups"))) logWrite("Registered admin group commands for default locale") except bad_request_400.ChannelInvalid: logWrite(f"Could not register commands for admin group. Bot is likely not in the group.") # Registering destination group commands try: await app.set_bot_commands(commands["group_users"], scope=BotCommandScopeChat(chat_id=configGet("users", "groups"))) logWrite("Registered destination group commands") except bad_request_400.ChannelInvalid: logWrite(f"Could not register commands for destination group. Bot is likely not in the group.") # Registering destination group admin commands try: await app.set_bot_commands(commands["group_users_admins"], scope=BotCommandScopeChatAdministrators(chat_id=configGet("users", "groups"))) logWrite("Registered destination group admin commands") except bad_request_400.ChannelInvalid: logWrite(f"Could not register admin commands for destination group. Bot is likely not in the group.") if configGet("debug") is True: print(commands, flush=True) logWrite(f"Complete commands registration:\n{dumps(commands_raw, indent=4, ensure_ascii=False, encode_html_chars=False)}", debug=True)