TelegramBot/plugins/commands/import.py

80 lines
2.6 KiB
Python

from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
from classes.importer.csv import ImporterCSV
from classes.importer.json import ImporterJSON
from classes.pyroclient import PyroClient
from modules import custom_filters
@PyroClient.on_message(
~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) # type: ignore
)
async def command_import(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
app._("import", "messages", locale=user.locale),
reply_markup=ForceReply(placeholder=""),
)
while True:
answer = await listen_message(app, message.chat.id, 300)
if answer is None or answer.text == "/cancel":
await message.reply_text(
app._("cancelled", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
if answer.document is None or answer.document.mime_type not in [
"application/json",
"text/csv",
]:
await answer.reply_text(
app._("import_invalid_filetype", "messages", locale=user.locale).format(
cancel_notice=app._("cancel", "messages", locale=user.locale)
)
)
continue
break
file = await app.download_media(answer, in_memory=True)
data: bytes = bytes(file.getbuffer()) # type: ignore
# I'd like to replace it with switch-case, but 3.9 compatibility
# is still more important to be there. Although refactor may be
# done in the near future as Python 3.9 EOL gets nearer.
if answer.document.mime_type == "application/json":
importer = ImporterJSON()
elif answer.document.mime_type == "text/csv":
importer = ImporterCSV()
else:
await answer.reply_text(
app._("import_invalid_filetype", "messages", locale=user.locale).format(
cancel_notice=""
),
reply_markup=ReplyKeyboardRemove(),
)
return
try:
import_result = await importer.import_data(data)
except ValueError:
await answer.reply_text(
app._("import_invalid", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
await answer.reply_text(
app._("import_finished", "messages", locale=user.locale).format(
count=len(import_result)
),
reply_markup=ReplyKeyboardRemove(),
)