This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/modules/scheduled.py

162 lines
8.4 KiB
Python
Raw Normal View History

2023-01-03 16:12:46 +02:00
"""Automatically register commands and execute
some scheduled tasks is the main idea of this module"""
2022-12-30 21:36:06 +02:00
from os import listdir, makedirs, path, sep
2022-12-11 20:51:46 +02:00
from apscheduler.schedulers.asyncio import AsyncIOScheduler
2022-12-18 00:25:53 +02:00
from datetime import datetime, timedelta
2022-12-10 16:52:34 +02:00
from app import app
2022-12-18 00:36:39 +02:00
from pyrogram.types import BotCommand, BotCommandScopeChat
from pyrogram.errors import bad_request_400
2022-12-18 00:25:53 +02:00
from pyrogram.enums.chat_members_filter import ChatMembersFilter
2022-12-22 16:05:27 +02:00
from classes.holo_user import HoloUser
2022-12-30 21:36:06 +02:00
from modules.utils import configGet, jsonSave, locale, logWrite
2022-12-10 16:52:34 +02:00
from dateutil.relativedelta import relativedelta
2022-12-22 16:05:27 +02:00
from modules.database import col_applications, col_sponsorships
2022-12-10 16:52:34 +02:00
2022-12-11 20:51:46 +02:00
scheduler = AsyncIOScheduler()
2022-12-10 16:52:34 +02:00
if configGet("enabled", "scheduler", "cache_members"):
@scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_members"))
async def cache_group_members():
list_of_users = []
async for member in app.get_chat_members(configGet("destination_group")):
list_of_users.append(member.user.id)
makedirs("cache", exist_ok=True)
jsonSave(list_of_users, f"cache{sep}group_members")
2022-12-30 21:36:06 +02:00
if configGet("enabled", "scheduler", "cache_admins"):
@scheduler.scheduled_job(trigger="interval", seconds=configGet("interval", "scheduler", "cache_admins"))
async def cache_admins():
list_of_users = []
async for member in app.get_chat_members(configGet("admin_group")):
list_of_users.append(member.user.id)
makedirs("cache", exist_ok=True)
jsonSave(list_of_users, f"cache{sep}admins")
2022-12-30 21:36:06 +02:00
2022-12-18 00:36:39 +02:00
# Cache the avatars of group members
2022-12-18 00:25:53 +02:00
if configGet("enabled", "scheduler", "cache_avatars"):
@scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=10))
@scheduler.scheduled_job(trigger="interval", hours=configGet("interval", "scheduler", "cache_avatars"))
async def cache_avatars():
list_of_users = []
async for member in app.get_chat_members(configGet("destination_group"), filter=ChatMembersFilter.SEARCH, query=""):
list_of_users.append(member.user)
for user in list_of_users:
if user.photo != None:
if not path.exists(f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}'):
print(f'Pre-cached avatar {user.photo.big_file_id} of {user.id}', flush=True)
await app.download_media(user.photo.big_file_id, file_name=f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}')
logWrite("Avatars caching performed")
2022-12-18 00:36:39 +02:00
# Check for birthdays
2022-12-10 16:52:34 +02:00
if configGet("enabled", "scheduler", "birthdays"):
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "birthdays"))
async def check_birthdays():
2022-12-14 15:45:46 +02:00
for entry in col_applications.find():
if entry["application"]["2"].strftime("%d.%m") == datetime.now().strftime("%d.%m"):
try:
tg_user = await app.get_users(entry["user"])
await app.send_message( configGet("admin_group"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), entry["application"]["2"], '%d.%m.%Y').years)) ) # type: ignore
logWrite(f"Notified admins about {entry['user']}'s birthday")
except Exception as exp:
logWrite(f"Could not find user {entry['user']} to send a message about birthday due to '{exp}'")
continue
2022-12-10 16:52:34 +02:00
logWrite("Birthdays check performed")
2022-12-18 00:36:39 +02:00
# Check for expired sponsorships
2022-12-10 16:52:34 +02:00
if configGet("enabled", "scheduler", "sponsorships"):
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "sponsorships"))
async def check_sponsors():
2022-12-22 16:05:27 +02:00
for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()+timedelta(days=2)}}):
try:
tg_user = await app.get_users(entry["user"])
until_expiry = relativedelta(datetime.now(), entry["sponsorship"]["expires"]).days
await app.send_message( tg_user, locale("sponsorships_expires", "message").format(until_expiry) ) # type: ignore
logWrite(f"Notified user that sponsorship expires in {until_expiry} days")
except Exception as exp:
logWrite(f"Could not find user {entry['user']} notify about sponsorship expiry due to '{exp}'")
continue
for entry in col_sponsorships.find({"sponsorship.expires": {"$lt": datetime.now()}}):
try:
holo_user = HoloUser(entry["user"])
await app.send_message( entry["user"], locale("sponsorships_expired", "message") ) # type: ignore
await holo_user.label_reset(configGet("destination_group"))
2022-12-22 23:14:35 +02:00
col_sponsorships.find_one_and_delete({"user": holo_user.id})
2022-12-22 16:05:27 +02:00
try:
tg_user = await app.get_users(entry["user"])
2022-12-22 23:14:35 +02:00
logWrite(f"Notified user that sponsorship expired")
2022-12-22 16:05:27 +02:00
except Exception as exp:
logWrite(f"Could not find user {entry['user']} notify about sponsorship expired due to '{exp}'")
except Exception as exp:
logWrite(f"Could not reset label of user {entry['user']} due to '{exp}'")
continue
2022-12-18 00:36:39 +02:00
logWrite("Sponsorships check performed")
# Register all bot commands
@scheduler.scheduled_job(trigger="date", run_date=datetime.now()+timedelta(seconds=3))
async def commands_register():
valid_locales = []
files_locales = listdir(f'{configGet("locale", "locations")}')
for entry in files_locales:
if entry.endswith(".json"):
valid_locales.append(".".join(entry.split(".")[:-1]))
# Registering user commands
commands_list = []
for command in locale("commands"):
commands_list.append(BotCommand(command, locale("commands")[command]))
await app.set_bot_commands(commands_list)
logWrite("Registered user commands for default locale")
# Registering user commands for each locale
for lc in valid_locales:
commands_list = []
for command in locale("commands", locale=lc):
commands_list.append(BotCommand(command, locale("commands",locale=lc)[command]))
await app.set_bot_commands(commands_list, language_code=lc)
logWrite(f"Registered user commands for locale {lc}")
# Registering admin commands
commands_admin_list = []
for command in locale("commands"):
commands_admin_list.append(BotCommand(command, locale("commands")[command]))
for command in locale("commands_admin"):
commands_admin_list.append(BotCommand(command, locale("commands_admin")[command]))
for admin in configGet("admins"):
try:
await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=admin))
logWrite(f"Registered admin commands for admin {admin}")
except bad_request_400.PeerIdInvalid:
pass
try:
await app.set_bot_commands(commands_admin_list, scope=BotCommandScopeChat(chat_id=configGet("owner")))
logWrite(f"Registered admin commands for owner {configGet('owner')}")
except bad_request_400.PeerIdInvalid:
logWrite(f"Could not register commands for bot owner. Perhaps user has not started the bot yet.")
# Registering admin group commands
commands_group_admin_list = []
for command in locale("commands_group_admin"):
commands_group_admin_list.append(BotCommand(command, locale("commands_group_admin")[command]))
try:
await app.set_bot_commands(commands_group_admin_list, scope=BotCommandScopeChat(chat_id=configGet("admin_group")))
logWrite("Registered admin group commands for default locale")
except bad_request_400.ChannelInvalid:
logWrite(f"Could not register commands for admin group. Bot is likely not in the group.")
# Registering destination group commands
commands_group_destination_list = []
for command in locale("commands_group_destination"):
commands_group_destination_list.append(BotCommand(command, locale("commands_group_destination")[command]))
try:
await app.set_bot_commands(commands_group_destination_list, scope=BotCommandScopeChat(chat_id=configGet("destination_group")))
logWrite("Registered destination group commands")
except bad_request_400.ChannelInvalid:
logWrite(f"Could not register commands for destination group. Bot is likely not in the group.")