from os import listdir, makedirs, path, sep from apscheduler.schedulers.asyncio import AsyncIOScheduler from datetime import datetime, timedelta from app import app from pyrogram.types import BotCommand, BotCommandScopeChat from pyrogram.errors import bad_request_400 from pyrogram.enums.chat_members_filter import ChatMembersFilter from classes.holo_user import HoloUser from modules.utils import configGet, jsonSave, locale, logWrite from dateutil.relativedelta import relativedelta from modules.database import col_applications, col_sponsorships scheduler = AsyncIOScheduler() @scheduler.scheduled_job(trigger="interval", seconds=20) async def cache_group_members(): list_of_users = [] async for member in app.get_chat_members(configGet("destination_group")): list_of_users.append(member.user.id) makedirs("cache", exist_ok=True) jsonSave(list_of_users, f"cache{sep}group_members") @scheduler.scheduled_job(trigger="interval", seconds=40) async def cache_admins(): list_of_users = [] async for member in app.get_chat_members(configGet("admin_group")): list_of_users.append(member.user.id) makedirs("cache", exist_ok=True) jsonSave(list_of_users, f"cache{sep}admins") # Cache the avatars of group members if configGet("enabled", "scheduler", "cache_avatars"): @scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=10)) @scheduler.scheduled_job(trigger="interval", hours=configGet("interval", "scheduler", "cache_avatars")) async def cache_avatars(): list_of_users = [] async for member in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=""): list_of_users.append(member.user) for user in list_of_users: if user.photo != None: if not path.exists(f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}'): print(f'Pre-cached avatar {user.photo.big_file_id} of {user.id}', flush=True) await app.download_media(user.photo.big_file_id, file_name=f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}') logWrite("Avatars caching performed") # Check for birthdays if configGet("enabled", "scheduler", "birthdays"): @scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "birthdays")) async def check_birthdays(): for entry in col_applications.find(): if entry["application"]["2"].strftime("%d.%m") == datetime.now().strftime("%d.%m"): try: tg_user = await app.get_users(entry["user"]) await app.send_message( configGet("admin_group"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), entry["application"]["2"], '%d.%m.%Y').years)) ) # type: ignore logWrite(f"Notified admins about {entry['user']}'s birthday") except Exception as exp: logWrite(f"Could not find user {entry['user']} to send a message about birthday due to '{exp}'") continue logWrite("Birthdays check performed") # Check for expired sponsorships if configGet("enabled", "scheduler", "sponsorships"): @scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "sponsorships")) async def check_sponsors(): for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()+timedelta(days=2)}}): try: tg_user = await app.get_users(entry["user"]) until_expiry = relativedelta(datetime.now(), entry["sponsorship"]["expires"]).days await app.send_message( tg_user, locale("sponsorships_expires", "message").format(until_expiry) ) # type: ignore logWrite(f"Notified user that sponsorship expires in {until_expiry} days") except Exception as exp: logWrite(f"Could not find user {entry['user']} notify about sponsorship expiry due to '{exp}'") continue for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()}}): try: holo_user = HoloUser(entry["user"]) await app.send_message( entry["user"], locale("sponsorships_expired", "message") ) # type: ignore await holo_user.label_reset(configGet("destination_group")) col_sponsorships.find_one_and_delete({"user": holo_user.id}) try: tg_user = await app.get_users(entry["user"]) logWrite(f"Notified user that sponsorship expired") except Exception as exp: logWrite(f"Could not find user {entry['user']} notify about sponsorship expired due to '{exp}'") except Exception as exp: logWrite(f"Could not reset label of user {entry['user']} due to '{exp}'") continue logWrite("Sponsorships check performed") # Register all bot commands @scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=3)) async def commands_register(): valid_locales = [] files_locales = listdir(f'{configGet("locale", "locations")}') for entry in files_locales: if entry.endswith(".json"): valid_locales.append(".".join(entry.split(".")[:-1])) # Registering user commands commands_list = [] for command in locale("commands"): commands_list.append(BotCommand(command, locale("commands")[command])) await app.set_bot_commands(commands_list) logWrite("Registered user commands for default locale") # Registering user commands for each locale for lc in valid_locales: commands_list = [] for command in locale("commands", locale=lc): commands_list.append(BotCommand(command, locale("commands",locale=lc)[command])) await app.set_bot_commands(commands_list, language_code=lc) logWrite(f"Registered user commands for locale {lc}") # Registering admin commands commands_admin_list = [] for command in locale("commands"): commands_admin_list.append(BotCommand(command, locale("commands")[command])) for command in locale("commands_admin"): commands_admin_list.append(BotCommand(command, locale("commands_admin")[command])) for admin in configGet("admins"): try: await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin)) logWrite(f"Registered admin commands for admin {admin}") except bad_request_400.PeerIdInvalid: pass try: await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner"))) logWrite(f"Registered admin commands for owner {configGet('owner')}") except bad_request_400.PeerIdInvalid: logWrite(f"Could not register commands for bot owner. Perhaps user has not started the bot yet.") # Registering admin group commands commands_group_admin_list = [] for command in locale("commands_group_admin"): commands_group_admin_list.append(BotCommand(command, locale("commands_group_admin")[command])) try: await app.set_bot_commands(commands_group_admin_list, scope=BotCommandScopeChat(chat_id=configGet("admin_group"))) logWrite("Registered admin group commands for default locale") except bad_request_400.ChannelInvalid: logWrite(f"Could not register commands for admin group. Bot is likely not in the group.") # Registering destination group commands commands_group_destination_list = [] for command in locale("commands_group_destination"): commands_group_destination_list.append(BotCommand(command, locale("commands_group_destination")[command])) try: await app.set_bot_commands(commands_group_destination_list, scope=BotCommandScopeChat(chat_id=configGet("destination_group"))) logWrite("Registered destination group commands") except bad_request_400.ChannelInvalid: logWrite(f"Could not register commands for destination group. Bot is likely not in the group.")