80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
from convopyro import listen_message
|
|
from pyrogram import filters
|
|
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
|
|
|
|
from classes.importer.csv import ImporterCSV
|
|
from classes.importer.json import ImporterJSON
|
|
from classes.pyroclient import PyroClient
|
|
from modules import custom_filters
|
|
|
|
|
|
@PyroClient.on_message(
|
|
~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) # type: ignore
|
|
)
|
|
async def command_import(app: PyroClient, message: Message):
|
|
user = await app.find_user(message.from_user)
|
|
|
|
await message.reply_text(
|
|
app._("import", "messages", locale=user.locale),
|
|
reply_markup=ForceReply(placeholder=""),
|
|
)
|
|
|
|
while True:
|
|
answer = await listen_message(app, message.chat.id, 300)
|
|
|
|
if answer is None or answer.text == "/cancel":
|
|
await message.reply_text(
|
|
app._("cancelled", "messages", locale=user.locale),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
|
|
if answer.document is None or answer.document.mime_type not in [
|
|
"application/json",
|
|
"text/csv",
|
|
]:
|
|
await answer.reply_text(
|
|
app._("import_invalid_filetype", "messages", locale=user.locale).format(
|
|
cancel_notice=app._("cancel", "messages", locale=user.locale)
|
|
)
|
|
)
|
|
continue
|
|
|
|
break
|
|
|
|
file = await app.download_media(answer, in_memory=True)
|
|
|
|
data: bytes = bytes(file.getbuffer()) # type: ignore
|
|
|
|
# I'd like to replace it with switch-case, but 3.9 compatibility
|
|
# is still more important to be there. Although refactor may be
|
|
# done in the near future as Python 3.9 EOL gets nearer.
|
|
if answer.document.mime_type == "application/json":
|
|
importer = ImporterJSON()
|
|
elif answer.document.mime_type == "text/csv":
|
|
importer = ImporterCSV()
|
|
else:
|
|
await answer.reply_text(
|
|
app._("import_invalid_filetype", "messages", locale=user.locale).format(
|
|
cancel_notice=""
|
|
),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
|
|
try:
|
|
import_result = await importer.import_data(data)
|
|
except ValueError:
|
|
await answer.reply_text(
|
|
app._("import_invalid", "messages", locale=user.locale),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
|
|
await answer.reply_text(
|
|
app._("import_finished", "messages", locale=user.locale).format(
|
|
count=len(import_result)
|
|
),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|