import asyncio import logging from glob import iglob from io import BytesIO from os import getcwd, makedirs, path, remove from pathlib import Path from shutil import disk_usage, rmtree from traceback import format_exc from uuid import uuid4 from zipfile import ZipFile from convopyro import listen_message from photosapi_client.errors import UnexpectedStatus from pyrogram import filters from pyrogram.client import Client from pyrogram.types import ( KeyboardButton, Message, ReplyKeyboardMarkup, ReplyKeyboardRemove, ) from ujson import loads from classes.pyroclient import PyroClient from modules.api_client import ( BodyPhotoUpload, File, client, photo_delete, photo_upload, video_delete, video_upload, ) from modules.utils import USERS_WITH_CONTEXT, extract_and_save logger = logging.getLogger(__name__) @Client.on_message(~filters.scheduled & filters.command(["import"], prefixes=["", "/"])) async def cmd_import(app: PyroClient, message: Message): if message.from_user.id not in app.admins: return global USERS_WITH_CONTEXT if message.from_user.id not in USERS_WITH_CONTEXT: USERS_WITH_CONTEXT.append(message.from_user.id) else: return user = await app.find_user(message.from_user) await message.reply_text(app._("import_request", "message", locale=user.locale)) answer = await listen_message(app, message.chat.id, timeout=600) USERS_WITH_CONTEXT.remove(message.from_user.id) if answer is None: await message.reply_text( app._("import_ignored", "message", locale=user.locale), quote=True, ) return if answer.text == "/cancel": await answer.reply_text(app._("import_abort", "message", locale=user.locale)) return if answer.document is None: await answer.reply_text( app._( "import_invalid_media", "message", locale=user.locale, ), quote=True, ) return if answer.document.mime_type != "application/zip": await answer.reply_text( app._("import_invalid_mime", "message", locale=user.locale), quote=True, ) return if disk_usage(getcwd())[2] < (answer.document.file_size) * 3: await message.reply_text( app._("import_too_big", "message", locale=user.locale).format( answer.document.file_size // (2**30), disk_usage(getcwd())[2] // (2**30), ) ) return tmp_dir = str(uuid4()) logging.info( "Importing '%s' file %s bytes big (TMP ID %s)", answer.document.file_name, answer.document.file_size, tmp_dir, ) makedirs(Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), exist_ok=True) tmp_path = Path(f"{app.config['locations']['tmp']}/{answer.document.file_id}") downloading = await answer.reply_text( app._("import_downloading", "message", locale=user.locale), quote=True, ) await app.download_media(answer, file_name=str(tmp_path)) await downloading.edit(app._("import_unpacking", "message", locale=user.locale)) try: with ZipFile(tmp_path, "r") as handle: tasks = [ extract_and_save( handle, name, Path(f"{app.config['locations']['tmp']}/{tmp_dir}") ) for name in handle.namelist() ] _ = await asyncio.gather(*tasks) except Exception as exc: logger.error( "Could not import '%s' due to %s: %s", answer.document.file_name, exc, format_exc(), ) await answer.reply_text( app._("import_unpack_error", "message", locale=user.locale).format( exc, format_exc() ) ) return logger.info("Downloaded '%s' - awaiting upload", answer.document.file_name) await downloading.edit(app._("import_uploading", "message", locale=user.locale)) remove(tmp_path) for filename in iglob( str(Path(f"{app.config['locations']['tmp']}/{tmp_dir}")) + "**/**", recursive=True, ): if not path.isfile(filename): continue with open(str(filename), "rb") as fh: photo_bytes = BytesIO(fh.read()) try: # VIDEO SUPPORT IS PLANNED HERE TOO uploaded = await photo_upload( app.config["posting"]["api"]["album"], client=client, body=BodyPhotoUpload( File(photo_bytes, Path(filename).name, "image/jpeg") ), ignore_duplicates=app.config["submission"]["allow_duplicates"], compress=False, caption="queue", ) except UnexpectedStatus as exc: logger.error( "Could not upload '%s' from '%s': %s", filename, Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), exc, ) await message.reply_text( app._( "import_upload_error_other", "message", locale=user.locale, ).format(path.basename(filename)), disable_notification=True, ) continue uploaded_dict = loads(uploaded.content.decode("utf-8")) if "duplicates" in uploaded_dict: logger.warning( "Could not upload '%s' from '%s'. Duplicates: %s", filename, Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), str(uploaded_dict["duplicates"]), ) if len(uploaded_dict["duplicates"]) > 0: await message.reply_text( app._( "import_upload_error_duplicate", "message", locale=user.locale, ).format(path.basename(filename)), disable_notification=True, ) else: await message.reply_text( app._( "import_upload_error_other", "message", locale=user.locale, ).format(path.basename(filename)), disable_notification=True, ) else: logger.info( "Uploaded '%s' from '%s' and got ID %s", filename, Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), uploaded.parsed.id, ) await downloading.delete() logger.info( "Removing '%s' after uploading", Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), ) rmtree(Path(f"{app.config['locations']['tmp']}/{tmp_dir}"), ignore_errors=True) await answer.reply_text( app._("import_finished", "message", locale=user.locale), quote=True, ) return @Client.on_message(~filters.scheduled & filters.command(["export"], prefixes=["", "/"])) async def cmd_export(app: PyroClient, message: Message): if message.from_user.id not in app.admins: return @Client.on_message(~filters.scheduled & filters.command(["remove"], prefixes=["", "/"])) async def cmd_remove(app: PyroClient, message: Message): if message.from_user.id not in app.admins: return global USERS_WITH_CONTEXT if message.from_user.id not in USERS_WITH_CONTEXT: USERS_WITH_CONTEXT.append(message.from_user.id) else: return user = await app.find_user(message.from_user) await message.reply_text(app._("remove_request", "message", locale=user.locale)) answer_id = await app.listen.Message( filters.text & ~filters.me, id=filters.user(message.from_user.id), timeout=600 ) USERS_WITH_CONTEXT.remove(message.from_user.id) if answer_id is None: await message.reply_text( app._("remove_ignored", "message", locale=user.locale), quote=True, ) return if answer_id.text == "/cancel": await answer_id.reply_text(app._("remove_abort", "message", locale=user.locale)) return await message.reply_text( app._("remove_kind", "message", locale=user.locale), reply_markup=ReplyKeyboardMarkup( [ [ KeyboardButton(app._("photo", "button", locale=user.locale)), KeyboardButton(app._("video", "button", locale=user.locale)), ] ], resize_keyboard=True, one_time_keyboard=True, ), ) USERS_WITH_CONTEXT.append(message.from_user.id) answer_kind = await app.listen.Message( filters.text & ~filters.me, id=filters.user(message.from_user.id), timeout=600 ) USERS_WITH_CONTEXT.remove(message.from_user.id) if answer_kind is None: await message.reply_text( app._("remove_ignored", "message", locale=user.locale), quote=True, reply_markup=ReplyKeyboardRemove(), ) return if answer_kind.text == "/cancel": await answer_kind.reply_text( app._("remove_abort", "message", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return if answer_kind.text in app.in_all_locales("photo", "button"): func = photo_delete elif answer_kind.text in app.in_all_locales("video", "button"): func = video_delete else: await answer_kind.reply_text( app._("remove_unknown", "message", locale=user.locale).format( app._("photo", "button", locale=user.locale), app._("video", "button", locale=user.locale), ), reply_markup=ReplyKeyboardRemove(), ) return response = await func(id=answer_id.text, client=client) if response is None: logger.info( "Removed %s '%s' by request of user %s", answer_kind.text, answer_id.text, answer_id.from_user.id, ) await answer_kind.reply_text( app._("remove_success", "message", locale=user.locale).format( answer_id.text ), reply_markup=ReplyKeyboardRemove(), ) else: logger.warning( "Could not remove %s '%s' by request of user %s", answer_kind.text, answer_id.text, answer_id.from_user.id, ) await answer_kind.reply_text( app._("remove_failure", "message", locale=user.locale).format( answer_id.text ), reply_markup=ReplyKeyboardRemove(), ) @Client.on_message(~filters.scheduled & filters.command(["purge"], prefixes=["", "/"])) async def cmd_purge(app: PyroClient, message: Message): if message.from_user.id not in app.admins: return