TelegramBot/classes/pyroclient.py

114 lines
3.7 KiB
Python
Raw Permalink Normal View History

2024-05-26 23:56:11 +03:00
import logging
from datetime import datetime, timedelta
2023-08-27 23:43:16 +03:00
from typing import List, Union
2024-05-26 23:56:11 +03:00
from aiohttp import ClientSession
2023-08-27 23:43:16 +03:00
from apscheduler.triggers.cron import CronTrigger
from libbot.pyrogram.classes import PyroClient as LibPyroClient
2023-08-28 16:41:20 +03:00
from pymongo import ASCENDING, GEOSPHERE, TEXT
2023-08-27 23:43:16 +03:00
from pyrogram.types import User
from classes.location import Location
from classes.pyrouser import PyroUser
2024-05-26 23:56:11 +03:00
from classes.updater import Updater
2023-11-05 15:20:01 +02:00
from modules.database_api import col_locations
2023-08-27 23:43:16 +03:00
from modules.reminder import remind
2024-05-26 23:56:11 +03:00
logger = logging.getLogger(__name__)
2023-08-27 23:43:16 +03:00
class PyroClient(LibPyroClient):
def __init__(self, **kwargs):
2024-05-26 23:56:11 +03:00
self.__version__ = (0, 1, 2)
2023-08-27 23:43:16 +03:00
super().__init__(**kwargs)
2024-05-26 23:56:11 +03:00
self.updater = Updater(ClientSession())
self.contexts = []
2023-08-27 23:43:16 +03:00
if self.scheduler is not None:
self.scheduler.add_job(
remind, CronTrigger.from_crontab("* * * * *"), args=(self,)
)
2024-05-26 23:56:11 +03:00
if self.config["update_checker"]:
self.scheduler.add_job(
self.check_updates,
CronTrigger.from_crontab("0 12 */3 * *"),
next_run_time=datetime.now() + timedelta(seconds=10),
)
2023-08-27 23:43:16 +03:00
async def start(self, **kwargs):
await col_locations.create_index(
[("id", ASCENDING)], name="location_id", unique=True
)
await col_locations.create_index(
2023-08-28 16:41:20 +03:00
[("location", GEOSPHERE)],
2023-08-27 23:43:16 +03:00
name="location_location",
)
2023-08-28 16:41:20 +03:00
await col_locations.create_index([("name", TEXT)], name="location_name")
2023-08-27 23:43:16 +03:00
return await super().start(**kwargs)
2024-05-26 23:56:11 +03:00
async def stop(self, **kwargs):
await self.updater.client_session.close()
await super().stop(**kwargs)
2023-08-27 23:43:16 +03:00
async def find_user(self, user: Union[int, User]) -> PyroUser:
"""Find User by it's ID or User object.
### Args:
* user (`Union[int, User]`): ID or User object to extract ID from.
### Returns:
* `PyroUser`: User in database representation.
"""
return (
await PyroUser.find(user)
if isinstance(user, int)
else await PyroUser.find(user.id, locale=user.language_code)
)
async def get_location(self, id: int) -> Location:
"""Get Location by it's ID.
### Args:
* id (`int`): Location's ID. Defaults to `None`.
### Returns:
* `Location`: Location from database as an object.
"""
return await Location.get(id)
async def list_locations(self) -> List[Location]:
"""Get all locations stored in database.
### Returns:
* `List[Location]`: List of `Location` objects.
"""
return [
await Location.get(record["id"]) async for record in col_locations.find({})
]
2024-05-26 23:56:11 +03:00
async def check_updates(self) -> None:
"""Check for updates and send a message to the owner if newer version was found"""
2024-05-26 23:56:11 +03:00
if await self.updater.check_updates(
self.__version__, self.config["strings"]["url_updater"]
):
try:
release = await self.updater.get_latest_release(
self.config["strings"]["url_updater"]
)
except Exception as exc:
logger.error("Could not fetch the latest version: %s", exc)
return
await self.send_message(
self.owner,
self._("update_available", "messages").format(
version_current=f"v{'.'.join(str(subversion) for subversion in self.__version__)}",
version_new=release["tag_name"],
release_url=release["html_url"],
),
)