import contextlib import logging from datetime import datetime from io import BytesIO from os import makedirs, remove, sep from pathlib import Path from shutil import rmtree from time import time from traceback import format_exc from typing import Dict, List, Tuple, Union import aiofiles from aiohttp import ClientSession from apscheduler.schedulers.asyncio import AsyncIOScheduler from bson import ObjectId from libbot import json_write from libbot.i18n.sync import _ from libbot.pyrogram.classes import PyroClient from photosapi_client.errors import UnexpectedStatus from pyrogram.errors import bad_request_400 from pyrogram.types import Message, User from pytimeparse.timeparse import timeparse from ujson import dumps, loads from classes.enums.submission_types import SubmissionType from classes.exceptions import ( SubmissionDuplicatesError, SubmissionUnavailableError, SubmissionUnsupportedError, ) from classes.pyrouser import PyroUser from modules.api_client import ( BodyPhotoUpload, BodyVideoUpload, File, Photo, Video, client, photo_upload, video_upload, ) from modules.database import col_submitted, col_users from modules.http_client import http_session from modules.sender import send_content logger = logging.getLogger(__name__) class PyroClient(PyroClient): def __init__(self, scheduler: AsyncIOScheduler): super().__init__(locales_root=Path("locale"), scheduler=scheduler) self.version: float = 0.3 self.owner: int = self.config["bot"]["owner"] self.admins: List[int] = self.config["bot"]["admins"] + [ self.config["bot"]["owner"] ] self.sender_session = ClientSession() self.scopes_placeholders: Dict[str, int] = { "owner": self.owner, "comments": self.config["posting"]["comments"], } async def start(self): await super().start() if self.config["reports"]["update"]: try: async with ClientSession( json_serialize=dumps, ) as http_session: check_update = await http_session.get( "https://git.end-play.xyz/api/v1/repos/profitroll/TelegramPoster/releases?page=1&limit=1" ) response = await check_update.json() if len(response) == 0: raise ValueError("No bot releases on git found.") if float(response[0]["tag_name"].replace("v", "")) > self.version: logger.info( "New version %s found (current %s)", response[0]["tag_name"].replace("v", ""), self.version, ) await self.send_message( self.owner, self._( "update_available", "message", ).format( response[0]["tag_name"], response[0]["html_url"], response[0]["body"], ), ) else: logger.info("No updates found, bot is up to date.") except bad_request_400.PeerIdInvalid: logger.warning( "Could not send startup message to bot owner. Perhaps user has not started the bot yet." ) except Exception as exp: logger.exception("Update check failed due to %s: %s", exp, format_exc()) if self.config["mode"]["post"]: if self.config["posting"]["use_interval"]: self.scheduler.add_job( send_content, "interval", seconds=timeparse(self.config["posting"]["interval"]), args=[self, self.sender_session], ) else: for entry in self.config["posting"]["time"]: dt_obj = datetime.strptime(entry, "%H:%M") self.scheduler.add_job( send_content, "cron", hour=dt_obj.hour, minute=dt_obj.minute, args=[self, self.sender_session], ) async def stop(self): makedirs(self.config["locations"]["cache"], exist_ok=True) await json_write( {"timestamp": time()}, Path(f"{self.config['locations']['cache']}/shutdown_time"), ) await http_session.close() await self.sender_session.close() await super().stop() async def submit_media( self, id: str ) -> Tuple[Union[Message, None], Union[str, None]]: db_entry = col_submitted.find_one({"_id": ObjectId(id)}) submission = None if db_entry is None: raise SubmissionUnavailableError() if db_entry["temp"]["uuid"] is None: try: submission = await self.get_messages( db_entry["user"], db_entry["telegram"]["msg_id"] ) filepath = await self.download_media( submission, file_name=self.config["locations"]["tmp"] + sep ) except Exception as exp: raise SubmissionUnavailableError() from exp elif not Path( f"{self.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}/{db_entry['temp']['file']}", ).exists(): raise SubmissionUnavailableError() else: filepath = Path( f"{self.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}/{db_entry['temp']['file']}", ) with contextlib.suppress(Exception): submission = await self.get_messages( db_entry["user"], db_entry["telegram"]["msg_id"] ) async with aiofiles.open(str(filepath), "rb") as fh: media_bytes = BytesIO(await fh.read()) try: if db_entry["type"] == SubmissionType.PHOTO.value: response = await photo_upload( self.config["posting"]["api"]["album"], client=client, multipart_data=BodyPhotoUpload( File(media_bytes, filepath.name, "image/jpeg") ), ignore_duplicates=self.config["submission"]["allow_duplicates"], compress=False, caption="queue", ) elif db_entry["type"] == SubmissionType.VIDEO.value: response = await video_upload( self.config["posting"]["api"]["album"], client=client, multipart_data=BodyVideoUpload( File(media_bytes, filepath.name, "video/*") ), caption="queue", ) # elif db_entry["type"] == SubmissionType.ANIMATION.value: # response = await video_upload( # self.config["posting"]["api"]["album"], # client=client, # multipart_data=BodyVideoUpload( # File(media_bytes, filepath.name, "video/*") # ), # caption="queue", # ) except UnexpectedStatus as exp: raise SubmissionUnsupportedError(str(filepath)) from exp response_dict = ( {} if not hasattr(response, "content") else loads(response.content.decode("utf-8")) ) if "duplicates" in response_dict and len(response_dict["duplicates"]) > 0: duplicates = [] for index, duplicate in enumerate(response_dict["duplicates"]): # type: ignore if response_dict["access_token"] is None: duplicates.append( f"`{duplicate['id']}`:\n{self.config['posting']['api']['address_external']}/photos/{duplicate['id']}" ) else: duplicates.append( f"`{duplicate['id']}`:\n{self.config['posting']['api']['address_external']}/token/photo/{response_dict['access_token']}?id={index}" ) raise SubmissionDuplicatesError(str(filepath), duplicates) col_submitted.find_one_and_update( {"_id": ObjectId(id)}, {"$set": {"done": True}} ) try: if db_entry["temp"]["uuid"] is not None: rmtree( Path( f"{self.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}", ), ignore_errors=True, ) else: remove(str(filepath)) except (FileNotFoundError, NotADirectoryError): logger.error("Could not delete '%s' on submission accepted", filepath) return ( submission, response.parsed.id if hasattr(response, "parsed") else response.id, ) async def find_user(self, user: Union[int, User]) -> PyroUser: """Find User by it's ID or User object ### Args: * user (`Union[int, User]`): ID or User object to extract ID from ### Returns: * `PyroUser`: PyroUser object """ if ( col_users.find_one( {"id": user.id if isinstance(user, User) else user} ) # type: ignore is None ): col_users.insert_one( { "id": user.id if isinstance(user, User) else user, "locale": user.language_code if isinstance(user, User) else None, "banned": False, "cooldown": datetime(1970, 1, 1, 0, 0), "subscription": {"expires": datetime(1970, 1, 1, 0, 0)}, } ) # type: ignore db_record = col_users.find_one( {"id": user.id if isinstance(user, User) else user} ) # type: ignore return PyroUser(**db_record)