This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/modules/scheduled.py
2022-12-14 14:19:20 +01:00

34 lines
2.1 KiB
Python

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from datetime import datetime
from app import app
from modules.utils import configGet, locale, logWrite
from dateutil.relativedelta import relativedelta
from modules.database import col_applications
scheduler = AsyncIOScheduler()
# for user_file in listdir(f"{configGet('data', 'locations')}{sep}users{sep}"):
# filename = fsdecode(f"{configGet('data', 'locations')}{sep}users{sep}{user_file}")
# if filename.endswith(".json"):
# user = jsonLoad(filename)
# if isinstance(user["application"]["2"], str):
# try:
# if ".".join([((user["application"]["2"]).split("."))[0], ((user["application"]["2"]).split("."))[1]]) == datetime.now().strftime("%d.%m"):
# tg_user = await app.get_users(int(user_file.replace(".json", "")))
# await app.send_message( configGet("admin_group"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), datetime.strptime(user["application"]["2"], '%d.%m.%Y')).years)) )
# except AttributeError:
# continue
if configGet("enabled", "scheduler", "birthdays"):
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "birthdays"))
async def check_birthdays():
for entry in col_applications.find({"2": datetime.now().strftime("%d.%m.%Y")}):
tg_user = await app.get_users(entry["user"])
await app.send_message( configGet("admin_group"), locale("birthday", "message").format(str(tg_user.first_name), str(tg_user.username), str(relativedelta(datetime.now(), datetime.strptime(entry["2"], '%d.%m.%Y')).years)) ) # type: ignore
logWrite(f"Notified admins about {entry['user']}'s birthday")
logWrite("Birthdays check performed")
if configGet("enabled", "scheduler", "sponsorships"):
@scheduler.scheduled_job(trigger="cron", hour=configGet("time", "scheduler", "sponsorships"))
async def check_sponsors():
logWrite("Sponsorships check performed")