This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/modules/scheduled.py

465 lines
18 KiB
Python

"""Automatically register commands and execute
some scheduled tasks is the main idea of this module"""
from asyncio import sleep
from os import listdir, makedirs, path, sep
from traceback import format_exc
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime, timedelta
from ujson import dumps
from app import app
from pyrogram.types import (
BotCommand,
BotCommandScopeChat,
BotCommandScopeChatAdministrators,
)
from pyrogram.errors import bad_request_400
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from classes.holo_user import HoloUser
from modules.utils import configGet, jsonLoad, jsonSave, locale, logWrite
from dateutil.relativedelta import relativedelta
from modules.database import (
col_applications,
col_sponsorships,
col_youtube,
col_warnings,
)
from xmltodict import parse
from requests import get
scheduler = AsyncIOScheduler()
if configGet("enabled", "scheduler", "cache_members"):
@scheduler.scheduled_job(
trigger="interval", seconds=configGet("interval", "scheduler", "cache_members")
)
async def cache_group_members():
list_of_users = []
async for member in app.get_chat_members(configGet("users", "groups")):
list_of_users.append(member.user.id)
makedirs(configGet("cache", "locations"), exist_ok=True)
jsonSave(
list_of_users, path.join(configGet("cache", "locations"), "group_members")
)
if configGet("debug") is True:
logWrite("User group caching performed", debug=True)
if configGet("enabled", "scheduler", "cache_admins"):
@scheduler.scheduled_job(
trigger="interval", seconds=configGet("interval", "scheduler", "cache_admins")
)
async def cache_admins():
list_of_users = []
async for member in app.get_chat_members(configGet("admin", "groups")):
list_of_users.append(member.user.id)
makedirs(configGet("cache", "locations"), exist_ok=True)
jsonSave(list_of_users, path.join(configGet("cache", "locations"), "admins"))
if configGet("debug") is True:
logWrite("Admin group caching performed", debug=True)
# Cache the avatars of group members
if configGet("enabled", "scheduler", "cache_avatars"):
@scheduler.scheduled_job(
trigger="date", run_date=datetime.now() + timedelta(seconds=15)
)
@scheduler.scheduled_job(
trigger="interval", hours=configGet("interval", "scheduler", "cache_avatars")
)
async def cache_avatars():
list_of_users = []
async for member in app.get_chat_members(
configGet("users", "groups"), filter=ChatMembersFilter.SEARCH, query=""
):
list_of_users.append(member.user)
for user in list_of_users:
if user.photo != None:
if not path.exists(
f'{configGet("cache", "locations")}{sep}avatars{sep}{user.photo.big_file_id}'
):
print(
f"Pre-cached avatar {user.photo.big_file_id} of {user.id}",
flush=True,
)
await app.download_media(
user.photo.big_file_id,
file_name=path.join(
configGet("cache", "locations"),
"avatars",
user.photo.big_file_id,
),
)
logWrite("Avatars caching performed")
# Check for birthdays
if configGet("enabled", "features", "applications") is True:
if configGet("enabled", "scheduler", "birthdays") is True:
@scheduler.scheduled_job(
trigger="cron", hour=configGet("time", "scheduler", "birthdays")
)
async def check_birthdays():
for entry in col_applications.find():
if entry["application"]["2"].strftime(
"%d.%m"
) == datetime.now().strftime("%d.%m"):
try:
if entry["user"] not in jsonLoad(
path.join(configGet("cache", "locations"), "group_members")
):
continue
tg_user = await app.get_users(entry["user"])
await app.send_message(configGet("admin", "groups"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), entry["application"]["2"], "%d.%m.%Y").years))) # type: ignore
logWrite(f"Notified admins about {entry['user']}'s birthday")
except Exception as exp:
logWrite(
f"Could not find user {entry['user']} to send a message about birthday due to '{exp}'"
)
continue
logWrite("Birthdays check performed")
# Check for expired sponsorships
if configGet("enabled", "features", "sponsorships") is True:
if configGet("enabled", "scheduler", "sponsorships") is True:
@scheduler.scheduled_job(
trigger="cron", hour=configGet("time", "scheduler", "sponsorships")
)
async def check_sponsors():
for entry in col_sponsorships.find(
{"sponsorship.expires": {"$lt": datetime.now() + timedelta(days=3)}}
):
try:
if entry["user"] not in jsonLoad(
path.join(configGet("cache", "locations"), "group_members")
):
continue
tg_user = await app.get_users(entry["user"])
until_expiry = (
abs(
relativedelta(
datetime.now(), entry["sponsorship"]["expires"]
).days
)
+ 1
)
await app.send_message(tg_user.id, locale("sponsorships_expires", "message").format(until_expiry)) # type: ignore
logWrite(
f"Notified user {entry['user']} that sponsorship expires in {until_expiry} days"
)
except Exception as exp:
logWrite(
f"Could not find user {entry['user']} notify about sponsorship expiry due to '{exp}'"
)
continue
for entry in col_sponsorships.find(
{
"sponsorship.expires": {
"$lt": datetime.now()
- timedelta(
days=configGet("grayout_days", "scheduler", "sponsorships")
)
}
}
):
try:
holo_user = HoloUser(entry["user"])
col_sponsorships.find_one_and_delete({"user": holo_user.id})
if entry["user"] not in jsonLoad(
path.join(configGet("cache", "locations"), "group_members")
):
continue
await app.send_message(entry["user"], locale("sponsorships_expired", "message")) # type: ignore
await holo_user.label_reset(configGet("users", "groups"))
try:
tg_user = await app.get_users(entry["user"])
logWrite(
f"Notified user {entry['user']} that sponsorship expired"
)
except Exception as exp:
logWrite(
f"Could not find user {entry['user']} notify about sponsorship expired due to '{exp}'"
)
except Exception as exp:
logWrite(
f"Could not reset label of user {entry['user']} due to '{exp}'"
)
continue
logWrite("Sponsorships check performed")
# Revoke old warnings
if configGet("enabled", "features", "warnings") is True:
if configGet("enabled", "scheduler", "warnings_revocation") is True:
@scheduler.scheduled_job(
trigger="date", run_date=datetime.now() + timedelta(seconds=10)
)
@scheduler.scheduled_job(
trigger="interval",
hours=configGet("interval", "scheduler", "warnings_revocation"),
)
async def revoke_warnings():
for warning in list(
col_warnings.find(
{
"active": True,
"date": {"$lt": datetime.now() - timedelta(days=90)},
}
)
):
if (
col_warnings.count_documents(
{
"user": warning["user"],
"active": True,
"date": {"$gt": datetime.now() - timedelta(days=90)},
}
)
== 0
):
col_warnings.update_one(
{"_id": warning["_id"]},
{"$set": {"active": False, "revoke_date": datetime.now()}},
)
logWrite(
f'Revoked warning {str(warning["_id"])} of user {warning["user"]} because no active warnings for the last 90 days found.'
)
await app.send_message(
configGet("admin", "groups"),
locale("warning_revoked_auto", "message").format(
warning["user"], warning["date"].strftime("%d.%m.%Y")
),
)
# Register all bot commands
@scheduler.scheduled_job(
trigger="date", run_date=datetime.now() + timedelta(seconds=10)
)
async def commands_register():
commands = {
"users": [],
"admins": [],
"owner": [],
"group_users": [],
"group_admins": [],
"group_users_admins": [],
"locales": {},
}
commands_raw = {
"users": [],
"admins": [],
"owner": [],
"group_users": [],
"group_admins": [],
"group_users_admins": [],
"locales": {},
}
valid_locales = []
files_locales = listdir(f'{configGet("locale", "locations")}')
for entry in files_locales:
if entry.endswith(".json"):
valid_locales.append(".".join(entry.split(".")[:-1]))
commands["locales"][".".join(entry.split(".")[:-1])] = {
"users": [],
"admins": [],
"owner": [],
"group_users": [],
"group_admins": [],
"group_users_admins": [],
}
if configGet("debug") is True:
commands_raw["locales"][".".join(entry.split(".")[:-1])] = {
"users": [],
"admins": [],
"owner": [],
"group_users": [],
"group_admins": [],
"group_users_admins": [],
}
config_modules = configGet("features")
config_commands = configGet("commands")
for command in config_commands:
enabled = False
for module in config_commands[command]["modules"]:
if config_modules[module]["enabled"] is True:
enabled = True
if enabled is False:
logWrite(f"Not registering {command} at all", debug=True)
continue
for permission in config_commands[command]["permissions"]:
commands[permission].append(
BotCommand(command, locale("commands")[command])
)
if configGet("debug") is True:
commands_raw[permission].append(
{f"{command}": locale("commands")[command]}
)
logWrite(f"Registering {command} for {permission}")
for lc in valid_locales:
commands["locales"][lc][permission].append(
BotCommand(command, locale("commands", locale=lc)[command])
)
if configGet("debug") is True:
commands_raw["locales"][lc][permission].append(
{f"{command}": locale("commands", locale=lc)[command]}
)
logWrite(f"Registering {command} for {permission} [{lc}]")
# Registering user commands
await app.set_bot_commands(commands["users"])
logWrite("Registered user commands for default locale")
# Registering user commands for each locale
for lc in valid_locales:
await app.set_bot_commands(commands["locales"][lc]["users"], language_code=lc)
logWrite(f"Registered user commands for locale {lc}")
# Registering admin commands
for admin in configGet("admins"):
try:
await app.set_bot_commands(
commands["admins"] + commands["users"],
scope=BotCommandScopeChat(chat_id=admin),
)
logWrite(f"Registered admin commands for admin {admin}")
except bad_request_400.PeerIdInvalid:
pass
# Registering owner commands
try:
await app.set_bot_commands(
commands["admins"] + commands["owner"] + commands["users"],
scope=BotCommandScopeChat(chat_id=configGet("owner")),
)
for lc in valid_locales:
await app.set_bot_commands(
commands["locales"][lc]["admins"]
+ commands["locales"][lc]["owner"]
+ commands["locales"][lc]["users"],
scope=BotCommandScopeChat(chat_id=configGet("owner")),
)
logWrite(f"Registered admin commands for owner {configGet('owner')}")
except bad_request_400.PeerIdInvalid:
pass
# Registering admin group commands
try:
await app.set_bot_commands(
commands["group_admins"],
scope=BotCommandScopeChat(chat_id=configGet("admin", "groups")),
)
logWrite("Registered admin group commands for default locale")
except bad_request_400.ChannelInvalid:
logWrite(
f"Could not register commands for admin group. Bot is likely not in the group."
)
# Registering destination group commands
try:
await app.set_bot_commands(
commands["group_users"],
scope=BotCommandScopeChat(chat_id=configGet("users", "groups")),
)
logWrite("Registered destination group commands")
except bad_request_400.ChannelInvalid:
logWrite(
f"Could not register commands for destination group. Bot is likely not in the group."
)
# Registering destination group admin commands
try:
await app.set_bot_commands(
commands["group_users_admins"],
scope=BotCommandScopeChatAdministrators(
chat_id=configGet("users", "groups")
),
)
logWrite("Registered destination group admin commands")
except bad_request_400.ChannelInvalid:
logWrite(
f"Could not register admin commands for destination group. Bot is likely not in the group."
)
if configGet("debug") is True:
print(commands, flush=True)
logWrite(
f"Complete commands registration:\n{dumps(commands_raw, indent=4, ensure_ascii=False, encode_html_chars=False)}",
debug=True,
)
if configGet("enabled", "scheduler", "channels_monitor"):
@scheduler.scheduled_job(
trigger="interval",
minutes=configGet("interval", "scheduler", "channels_monitor"),
)
async def channels_monitor():
for channel in configGet("channels", "scheduler", "channels_monitor"):
if configGet("debug") is True:
logWrite(
f'Processing videos of {channel["name"]} ({channel["id"]})',
debug=True,
)
try:
req = get(
f'https://www.youtube.com/feeds/videos.xml?channel_id={channel["id"]}'
)
parsed = parse(req.content)
if "feed" not in parsed:
continue
if "entry" not in parsed["feed"]:
continue
for entry in parsed["feed"]["entry"]:
if "yt:videoId" not in entry:
continue
if (
col_youtube.find_one(
{"channel": channel["id"], "video": entry["yt:videoId"]}
)
is None
):
col_youtube.insert_one(
{
"channel": channel["id"],
"video": entry["yt:videoId"],
"date": datetime.fromisoformat(entry["published"]),
}
)
await app.send_message(
configGet("users", "groups"),
locale("youtube_video", "message").format(
channel["name"],
channel["link"],
entry["title"],
entry["link"]["@href"],
),
disable_web_page_preview=False,
)
await sleep(2)
except Exception as exp:
logWrite(
f'Could not get last videos of {channel["name"]} ({channel["id"]}) due to {exp}: {format_exc()}'
)
if configGet("debug") is True:
logWrite("Admin group caching performed", debug=True)