from convopyro import listen_message from pyrogram import filters from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove from classes.importer.csv import ImporterCSV from classes.importer.json import ImporterJSON from classes.pyroclient import PyroClient from modules import custom_filters @PyroClient.on_message( ~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) # type: ignore ) async def command_import(app: PyroClient, message: Message): user = await app.find_user(message.from_user) await message.reply_text( app._("import", "messages", locale=user.locale), reply_markup=ForceReply(placeholder=""), ) while True: answer = await listen_message(app, message.chat.id, 300) if answer is None or answer.text == "/cancel": await message.reply_text( app._("cancelled", "messages", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return if answer.document is None or answer.document.mime_type not in [ "application/json", "text/csv", ]: await answer.reply_text( app._("import_invalid_filetype", "messages", locale=user.locale).format( cancel_notice=app._("cancel", "messages", locale=user.locale) ) ) continue break file = await app.download_media(answer, in_memory=True) data: bytes = bytes(file.getbuffer()) # type: ignore # I'd like to replace it with switch-case, but 3.9 compatibility # is still more important to be there. Although refactor may be # done in the near future as Python 3.9 EOL gets nearer. if answer.document.mime_type == "application/json": importer = ImporterJSON() elif answer.document.mime_type == "text/csv": importer = ImporterCSV() else: await answer.reply_text( app._("import_invalid_filetype", "messages", locale=user.locale).format( cancel_notice="" ), reply_markup=ReplyKeyboardRemove(), ) return try: import_result = await importer.import_data(data) except ValueError: await answer.reply_text( app._("import_invalid", "messages", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return await answer.reply_text( app._("import_finished", "messages", locale=user.locale).format( count=len(import_result) ), reply_markup=ReplyKeyboardRemove(), )