162 lines
8.4 KiB
Python
162 lines
8.4 KiB
Python
"""Automatically register commands and execute
|
|
some scheduled tasks is the main idea of this module"""
|
|
|
|
from os import listdir, makedirs, path, sep
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from datetime import datetime, timedelta
|
|
from app import app
|
|
from pyrogram.types import BotCommand, BotCommandScopeChat
|
|
from pyrogram.errors import bad_request_400
|
|
from pyrogram.enums.chat_members_filter import ChatMembersFilter
|
|
from classes.holo_user import HoloUser
|
|
from modules.utils import configGet, jsonSave, locale, logWrite
|
|
from dateutil.relativedelta import relativedelta
|
|
from modules.database import col_applications, col_sponsorships
|
|
|
|
scheduler = AsyncIOScheduler()
|
|
|
|
if configGet("enabled", "scheduler", "cache_members"):
|
|
@scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_members"))
|
|
async def cache_group_members():
|
|
list_of_users = []
|
|
async for member in app.get_chat_members(configGet("destination_group")):
|
|
list_of_users.append(member.user.id)
|
|
makedirs("cache", exist_ok=True)
|
|
jsonSave(list_of_users, f"cache{sep}group_members")
|
|
|
|
if configGet("enabled", "scheduler", "cache_admins"):
|
|
@scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_admins"))
|
|
async def cache_admins():
|
|
list_of_users = []
|
|
async for member in app.get_chat_members(configGet("admin_group")):
|
|
list_of_users.append(member.user.id)
|
|
makedirs("cache", exist_ok=True)
|
|
jsonSave(list_of_users, f"cache{sep}admins")
|
|
|
|
# Cache the avatars of group members
|
|
if configGet("enabled", "scheduler", "cache_avatars"):
|
|
@scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=10))
|
|
@scheduler.scheduled_job(trigger="interval", hours=configGet("interval", "scheduler", "cache_avatars"))
|
|
async def cache_avatars():
|
|
list_of_users = []
|
|
async for member in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=""):
|
|
list_of_users.append(member.user)
|
|
for user in list_of_users:
|
|
if user.photo != None:
|
|
if not path.exists(f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}'):
|
|
print(f'Pre-cached avatar {user.photo.big_file_id} of {user.id}', flush=True)
|
|
await app.download_media(user.photo.big_file_id, file_name=f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}')
|
|
logWrite("Avatars caching performed")
|
|
|
|
|
|
# Check for birthdays
|
|
if configGet("enabled", "scheduler", "birthdays"):
|
|
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "birthdays"))
|
|
async def check_birthdays():
|
|
for entry in col_applications.find():
|
|
if entry["application"]["2"].strftime("%d.%m") == datetime.now().strftime("%d.%m"):
|
|
try:
|
|
tg_user = await app.get_users(entry["user"])
|
|
await app.send_message( configGet("admin_group"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), entry["application"]["2"], '%d.%m.%Y').years)) ) # type: ignore
|
|
logWrite(f"Notified admins about {entry['user']}'s birthday")
|
|
except Exception as exp:
|
|
logWrite(f"Could not find user {entry['user']} to send a message about birthday due to '{exp}'")
|
|
continue
|
|
logWrite("Birthdays check performed")
|
|
|
|
|
|
# Check for expired sponsorships
|
|
if configGet("enabled", "scheduler", "sponsorships"):
|
|
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "sponsorships"))
|
|
async def check_sponsors():
|
|
for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()+timedelta(days=2)}}):
|
|
try:
|
|
tg_user = await app.get_users(entry["user"])
|
|
until_expiry = relativedelta(datetime.now(), entry["sponsorship"]["expires"]).days
|
|
await app.send_message( tg_user, locale("sponsorships_expires", "message").format(until_expiry) ) # type: ignore
|
|
logWrite(f"Notified user that sponsorship expires in {until_expiry} days")
|
|
except Exception as exp:
|
|
logWrite(f"Could not find user {entry['user']} notify about sponsorship expiry due to '{exp}'")
|
|
continue
|
|
for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()}}):
|
|
try:
|
|
holo_user = HoloUser(entry["user"])
|
|
await app.send_message( entry["user"], locale("sponsorships_expired", "message") ) # type: ignore
|
|
await holo_user.label_reset(configGet("destination_group"))
|
|
col_sponsorships.find_one_and_delete({"user": holo_user.id})
|
|
try:
|
|
tg_user = await app.get_users(entry["user"])
|
|
logWrite(f"Notified user that sponsorship expired")
|
|
except Exception as exp:
|
|
logWrite(f"Could not find user {entry['user']} notify about sponsorship expired due to '{exp}'")
|
|
except Exception as exp:
|
|
logWrite(f"Could not reset label of user {entry['user']} due to '{exp}'")
|
|
continue
|
|
logWrite("Sponsorships check performed")
|
|
|
|
|
|
# Register all bot commands
|
|
@scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=3))
|
|
async def commands_register():
|
|
|
|
valid_locales = []
|
|
files_locales = listdir(f'{configGet("locale", "locations")}')
|
|
for entry in files_locales:
|
|
if entry.endswith(".json"):
|
|
valid_locales.append(".".join(entry.split(".")[:-1]))
|
|
|
|
# Registering user commands
|
|
commands_list = []
|
|
for command in locale("commands"):
|
|
commands_list.append(BotCommand(command, locale("commands")[command]))
|
|
await app.set_bot_commands(commands_list)
|
|
logWrite("Registered user commands for default locale")
|
|
|
|
# Registering user commands for each locale
|
|
for lc in valid_locales:
|
|
commands_list = []
|
|
for command in locale("commands", locale=lc):
|
|
commands_list.append(BotCommand(command, locale("commands",locale=lc)[command]))
|
|
await app.set_bot_commands(commands_list, language_code=lc)
|
|
logWrite(f"Registered user commands for locale {lc}")
|
|
|
|
# Registering admin commands
|
|
commands_admin_list = []
|
|
for command in locale("commands"):
|
|
commands_admin_list.append(BotCommand(command, locale("commands")[command]))
|
|
|
|
for command in locale("commands_admin"):
|
|
commands_admin_list.append(BotCommand(command, locale("commands_admin")[command]))
|
|
|
|
for admin in configGet("admins"):
|
|
try:
|
|
await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin))
|
|
logWrite(f"Registered admin commands for admin {admin}")
|
|
except bad_request_400.PeerIdInvalid:
|
|
pass
|
|
|
|
try:
|
|
await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner")))
|
|
logWrite(f"Registered admin commands for owner {configGet('owner')}")
|
|
except bad_request_400.PeerIdInvalid:
|
|
logWrite(f"Could not register commands for bot owner. Perhaps user has not started the bot yet.")
|
|
|
|
# Registering admin group commands
|
|
commands_group_admin_list = []
|
|
for command in locale("commands_group_admin"):
|
|
commands_group_admin_list.append(BotCommand(command, locale("commands_group_admin")[command]))
|
|
try:
|
|
await app.set_bot_commands(commands_group_admin_list, scope=BotCommandScopeChat(chat_id=configGet("admin_group")))
|
|
logWrite("Registered admin group commands for default locale")
|
|
except bad_request_400.ChannelInvalid:
|
|
logWrite(f"Could not register commands for admin group. Bot is likely not in the group.")
|
|
|
|
# Registering destination group commands
|
|
commands_group_destination_list = []
|
|
for command in locale("commands_group_destination"):
|
|
commands_group_destination_list.append(BotCommand(command, locale("commands_group_destination")[command]))
|
|
try:
|
|
await app.set_bot_commands(commands_group_destination_list, scope=BotCommandScopeChat(chat_id=configGet("destination_group")))
|
|
logWrite("Registered destination group commands")
|
|
except bad_request_400.ChannelInvalid:
|
|
logWrite(f"Could not register commands for destination group. Bot is likely not in the group.") |