import asyncio import logging from glob import iglob from io import BytesIO from os import getcwd, makedirs, path, remove from pathlib import Path from shutil import disk_usage, rmtree from traceback import format_exc from uuid import uuid4 from zipfile import ZipFile from convopyro import listen_message from photosapi_client.errors import UnexpectedStatus from pyrogram import filters from pyrogram.client import Client from pyrogram.types import Message from ujson import loads from classes.pyroclient import PyroClient from modules.api_client import ( BodyPhotoUploadAlbumsAlbumPhotosPost, File, client, photo_delete, photo_upload, ) from modules.utils import USERS_WITH_CONTEXT, extract_and_save logger = logging.getLogger(__name__) @Client.on_message(~filters.scheduled & filters.command(["import"], prefixes=["", "/"])) async def cmd_import(app: PyroClient, msg: Message): if msg.from_user.id not in app.admins: return global USERS_WITH_CONTEXT if msg.from_user.id not in USERS_WITH_CONTEXT: USERS_WITH_CONTEXT.append(msg.from_user.id) else: return await msg.reply_text( app._("import_request", "message", locale=msg.from_user.language_code) ) answer = await listen_message(app, msg.chat.id, timeout=600) USERS_WITH_CONTEXT.remove(msg.from_user.id) if answer is None: await msg.reply_text( app._("import_ignored", "message", locale=msg.from_user.language_code), quote=True, ) return if answer.text == "/cancel": await answer.reply_text( app._("import_abort", "message", locale=msg.from_user.language_code) ) return if answer.document is None: await answer.reply_text( app._( "import_invalid_media", "message", locale=msg.from_user.language_code, ), quote=True, ) return if answer.document.mime_type != "application/zip": await answer.reply_text( app._("import_invalid_mime", "message", locale=msg.from_user.language_code), quote=True, ) return if disk_usage(getcwd())[2] < (answer.document.file_size) * 3: await msg.reply_text( app._( "import_too_big", "message", locale=msg.from_user.language_code ).format( answer.document.file_size // (2**30), disk_usage(getcwd())[2] // (2**30), ) ) return tmp_dir = str(uuid4()) logging.info( "Importing '%s' file %s bytes big (TMP ID %s)", answer.document.file_name, answer.document.file_size, tmp_dir, ) makedirs(Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), exist_ok=True) tmp_path = Path(f"{app.config['locations']['tmp']}/{answer.document.file_id}") downloading = await answer.reply_text( app._("import_downloading", "message", locale=msg.from_user.language_code), quote=True, ) await app.download_media(answer, file_name=str(tmp_path)) await downloading.edit( app._("import_unpacking", "message", locale=msg.from_user.language_code) ) try: with ZipFile(tmp_path, "r") as handle: tasks = [ extract_and_save( handle, name, Path(f"{app.config['locations']['tmp']}/{tmp_dir}") ) for name in handle.namelist() ] _ = await asyncio.gather(*tasks) except Exception as exp: logger.error( "Could not import '%s' due to %s: %s", answer.document.file_name, exp, format_exc(), ) await answer.reply_text( app._( "import_unpack_error", "message", locale=msg.from_user.language_code ).format(exp, format_exc()) ) return logger.info("Downloaded '%s' - awaiting upload", answer.document.file_name) await downloading.edit( app._("import_uploading", "message", locale=msg.from_user.language_code) ) remove(tmp_path) for filename in iglob( str(Path(f"{app.config['locations']['tmp']}/{tmp_dir}")) + "**/**", recursive=True, ): if not path.isfile(filename): continue with open(str(filename), "rb") as fh: photo_bytes = BytesIO(fh.read()) try: uploaded = await photo_upload( app.config["posting"]["api"]["album"], client=client, multipart_data=BodyPhotoUploadAlbumsAlbumPhotosPost( File(photo_bytes, Path(filename).name, "image/jpeg") ), ignore_duplicates=app.config["submission"]["allow_duplicates"], compress=False, caption="queue", ) except UnexpectedStatus as exp: logger.error( "Could not upload '%s' from '%s': %s", filename, Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), exp, ) await msg.reply_text( app._( "import_upload_error_other", "message", locale=msg.from_user.language_code, ).format(path.basename(filename)), disable_notification=True, ) continue uploaded_dict = loads(uploaded.content.decode("utf-8")) if "duplicates" in uploaded_dict: logger.warning( "Could not upload '%s' from '%s'. Duplicates: %s", filename, Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), str(uploaded_dict["duplicates"]), ) if len(uploaded_dict["duplicates"]) > 0: await msg.reply_text( app._( "import_upload_error_duplicate", "message", locale=msg.from_user.language_code, ).format(path.basename(filename)), disable_notification=True, ) else: await msg.reply_text( app._( "import_upload_error_other", "message", locale=msg.from_user.language_code, ).format(path.basename(filename)), disable_notification=True, ) else: logger.info( "Uploaded '%s' from '%s' and got ID %s", filename, Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), uploaded.parsed.id, ) await downloading.delete() logger.info( "Removing '%s' after uploading", Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), ) rmtree(Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), ignore_errors=True) await answer.reply_text( app._("import_finished", "message", locale=msg.from_user.language_code), quote=True, ) return @Client.on_message(~filters.scheduled & filters.command(["export"], prefixes=["", "/"])) async def cmd_export(app: PyroClient, msg: Message): if msg.from_user.id not in app.admins: return @Client.on_message(~filters.scheduled & filters.command(["remove"], prefixes=["", "/"])) async def cmd_remove(app: PyroClient, msg: Message): if msg.from_user.id not in app.admins: return global USERS_WITH_CONTEXT if msg.from_user.id not in USERS_WITH_CONTEXT: USERS_WITH_CONTEXT.append(msg.from_user.id) else: return await msg.reply_text( app._("remove_request", "message", locale=msg.from_user.language_code) ) answer = await listen_message(app, msg.chat.id, timeout=600) USERS_WITH_CONTEXT.remove(msg.from_user.id) if answer is None: await msg.reply_text( app._("remove_ignored", "message", locale=msg.from_user.language_code), quote=True, ) return if answer.text == "/cancel": await answer.reply_text( app._("remove_abort", "message", locale=msg.from_user.language_code) ) return response = await photo_delete(id=answer.text, client=client) if response: logger.info( "Removed '%s' by request of user %s", answer.text, answer.from_user.id ) await answer.reply_text( app._( "remove_success", "message", locale=msg.from_user.language_code ).format(answer.text) ) else: logger.warning( "Could not remove '%s' by request of user %s", answer.text, answer.from_user.id, ) await answer.reply_text( app._( "remove_failure", "message", locale=msg.from_user.language_code ).format(answer.text) ) @Client.on_message(~filters.scheduled & filters.command(["purge"], prefixes=["", "/"])) async def cmd_purge(app: PyroClient, msg: Message): if msg.from_user.id not in app.admins: return