260 lines
9.2 KiB
Python
260 lines
9.2 KiB
Python
import contextlib
|
|
import logging
|
|
from datetime import datetime
|
|
from io import BytesIO
|
|
from os import makedirs, remove, sep
|
|
from pathlib import Path
|
|
from shutil import rmtree
|
|
from time import time
|
|
from traceback import format_exc
|
|
from typing import Dict, List, Tuple, Union
|
|
|
|
import aiofiles
|
|
from aiohttp import ClientSession
|
|
from bson import ObjectId
|
|
from libbot import json_write
|
|
from libbot.i18n.sync import _
|
|
from photosapi_client.errors import UnexpectedStatus
|
|
from pyrogram.errors import bad_request_400
|
|
from pyrogram.types import Message
|
|
from pytimeparse.timeparse import timeparse
|
|
from ujson import dumps, loads
|
|
|
|
from classes.enums.submission_types import SubmissionType
|
|
from classes.exceptions import (
|
|
SubmissionDuplicatesError,
|
|
SubmissionUnavailableError,
|
|
SubmissionUnsupportedError,
|
|
)
|
|
from modules.api_client import (
|
|
BodyPhotoUpload,
|
|
BodyVideoUpload,
|
|
File,
|
|
Photo,
|
|
Video,
|
|
client,
|
|
photo_upload,
|
|
video_upload,
|
|
)
|
|
from modules.database import col_submitted
|
|
from modules.http_client import http_session
|
|
from modules.sender import send_content
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
from datetime import datetime
|
|
from typing import List, Union
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from libbot.pyrogram.classes import PyroClient
|
|
|
|
|
|
class PyroClient(PyroClient):
|
|
def __init__(self, scheduler: AsyncIOScheduler):
|
|
super().__init__(scheduler=scheduler)
|
|
|
|
self.version: float = 0.2
|
|
|
|
self.owner: int = self.config["bot"]["owner"]
|
|
self.admins: List[int] = self.config["bot"]["admins"] + [
|
|
self.config["bot"]["owner"]
|
|
]
|
|
|
|
self.sender_session = ClientSession()
|
|
|
|
self.scopes_placeholders: Dict[str, int] = {
|
|
"owner": self.owner,
|
|
"comments": self.config["posting"]["comments"],
|
|
}
|
|
|
|
async def start(self):
|
|
await super().start()
|
|
|
|
if self.config["reports"]["update"]:
|
|
try:
|
|
async with ClientSession(
|
|
json_serialize=dumps,
|
|
) as http_session:
|
|
check_update = await http_session.get(
|
|
"https://git.end-play.xyz/api/v1/repos/profitroll/TelegramPoster/releases?page=1&limit=1"
|
|
)
|
|
|
|
response = await check_update.json()
|
|
|
|
if len(response) == 0:
|
|
raise ValueError("No bot releases on git found.")
|
|
|
|
if float(response[0]["tag_name"].replace("v", "")) > self.version:
|
|
logger.info(
|
|
"New version %s found (current %s)",
|
|
response[0]["tag_name"].replace("v", ""),
|
|
self.version,
|
|
)
|
|
await self.send_message(
|
|
self.owner,
|
|
self._(
|
|
"update_available",
|
|
"message",
|
|
).format(
|
|
response[0]["tag_name"],
|
|
response[0]["html_url"],
|
|
response[0]["body"],
|
|
),
|
|
)
|
|
else:
|
|
logger.info("No updates found, bot is up to date.")
|
|
except bad_request_400.PeerIdInvalid:
|
|
logger.warning(
|
|
"Could not send startup message to bot owner. Perhaps user has not started the bot yet."
|
|
)
|
|
except Exception as exp:
|
|
logger.exception("Update check failed due to %s: %s", exp, format_exc())
|
|
|
|
if self.config["mode"]["post"]:
|
|
if self.config["posting"]["use_interval"]:
|
|
self.scheduler.add_job(
|
|
send_content,
|
|
"interval",
|
|
seconds=timeparse(self.config["posting"]["interval"]),
|
|
args=[self, self.sender_session],
|
|
)
|
|
else:
|
|
for entry in self.config["posting"]["time"]:
|
|
dt_obj = datetime.strptime(entry, "%H:%M")
|
|
self.scheduler.add_job(
|
|
send_content,
|
|
"cron",
|
|
hour=dt_obj.hour,
|
|
minute=dt_obj.minute,
|
|
args=[self, self.sender_session],
|
|
)
|
|
|
|
async def stop(self):
|
|
makedirs(self.config["locations"]["cache"], exist_ok=True)
|
|
await json_write(
|
|
{"timestamp": time()},
|
|
Path(f"{self.config['locations']['cache']}/shutdown_time"),
|
|
)
|
|
|
|
await http_session.close()
|
|
await self.sender_session.close()
|
|
|
|
await super().stop()
|
|
|
|
async def submit_media(
|
|
self, id: str
|
|
) -> Tuple[Union[Message, None], Union[str, None]]:
|
|
db_entry = col_submitted.find_one({"_id": ObjectId(id)})
|
|
submission = None
|
|
|
|
if db_entry is None:
|
|
raise SubmissionUnavailableError()
|
|
|
|
if db_entry["temp"]["uuid"] is None:
|
|
try:
|
|
submission = await self.get_messages(
|
|
db_entry["user"], db_entry["telegram"]["msg_id"]
|
|
)
|
|
filepath = await self.download_media(
|
|
submission, file_name=self.config["locations"]["tmp"] + sep
|
|
)
|
|
except Exception as exp:
|
|
raise SubmissionUnavailableError() from exp
|
|
|
|
elif not Path(
|
|
f"{self.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}/{db_entry['temp']['file']}",
|
|
).exists():
|
|
raise SubmissionUnavailableError()
|
|
else:
|
|
filepath = Path(
|
|
f"{self.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}/{db_entry['temp']['file']}",
|
|
)
|
|
with contextlib.suppress(Exception):
|
|
submission = await self.get_messages(
|
|
db_entry["user"], db_entry["telegram"]["msg_id"]
|
|
)
|
|
|
|
async with aiofiles.open(str(filepath), "rb") as fh:
|
|
media_bytes = BytesIO(await fh.read())
|
|
|
|
try:
|
|
if db_entry["type"] == SubmissionType.PHOTO.value:
|
|
response = await photo_upload(
|
|
self.config["posting"]["api"]["album"],
|
|
client=client,
|
|
multipart_data=BodyPhotoUpload(
|
|
File(media_bytes, filepath.name, "image/jpeg")
|
|
),
|
|
ignore_duplicates=self.config["submission"]["allow_duplicates"],
|
|
compress=False,
|
|
caption="queue",
|
|
)
|
|
elif db_entry["type"] == SubmissionType.VIDEO.value:
|
|
response = await video_upload(
|
|
self.config["posting"]["api"]["album"],
|
|
client=client,
|
|
multipart_data=BodyVideoUpload(
|
|
File(media_bytes, filepath.name, "video/*")
|
|
),
|
|
caption="queue",
|
|
)
|
|
# elif db_entry["type"] == SubmissionType.ANIMATION.value:
|
|
# response = await video_upload(
|
|
# self.config["posting"]["api"]["album"],
|
|
# client=client,
|
|
# multipart_data=BodyVideoUpload(
|
|
# File(media_bytes, filepath.name, "video/*")
|
|
# ),
|
|
# caption="queue",
|
|
# )
|
|
except UnexpectedStatus as exp:
|
|
raise SubmissionUnsupportedError(str(filepath)) from exp
|
|
|
|
response_dict = (
|
|
{}
|
|
if not hasattr(response, "content")
|
|
else loads(response.content.decode("utf-8"))
|
|
)
|
|
|
|
if "duplicates" in response_dict and len(response_dict["duplicates"]) > 0:
|
|
duplicates = []
|
|
for index, duplicate in enumerate(response_dict["duplicates"]): # type: ignore
|
|
if response_dict["access_token"] is None:
|
|
duplicates.append(
|
|
f"`{duplicate['id']}`:\n{self.config['posting']['api']['address_external']}/photos/{duplicate['id']}"
|
|
)
|
|
else:
|
|
duplicates.append(
|
|
f"`{duplicate['id']}`:\n{self.config['posting']['api']['address_external']}/token/photo/{response_dict['access_token']}?id={index}"
|
|
)
|
|
raise SubmissionDuplicatesError(str(filepath), duplicates)
|
|
|
|
col_submitted.find_one_and_update(
|
|
{"_id": ObjectId(id)}, {"$set": {"done": True}}
|
|
)
|
|
|
|
try:
|
|
if db_entry["temp"]["uuid"] is not None:
|
|
rmtree(
|
|
Path(
|
|
f"{self.config['locations']['data']}/submissions/{db_entry['temp']['uuid']}",
|
|
),
|
|
ignore_errors=True,
|
|
)
|
|
else:
|
|
remove(str(filepath))
|
|
except (FileNotFoundError, NotADirectoryError):
|
|
logger.error("Could not delete '%s' on submission accepted", filepath)
|
|
|
|
return (
|
|
submission,
|
|
response.id if not hasattr(response, "parsed") else response.parsed.id,
|
|
)
|
|
|
|
async def ban_user(self, id: int) -> None:
|
|
pass
|
|
|
|
async def unban_user(self, id: int) -> None:
|
|
pass
|