95 lines
3.2 KiB
Python
95 lines
3.2 KiB
Python
from datetime import datetime
|
|
from typing import List, Mapping, Union
|
|
|
|
from convopyro import listen_message
|
|
from pyrogram import filters
|
|
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
|
|
from ujson import loads
|
|
|
|
from classes.pyroclient import PyroClient
|
|
from modules import custom_filters
|
|
from modules.database import col_entries
|
|
|
|
|
|
@PyroClient.on_message(
|
|
~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) # type: ignore
|
|
)
|
|
async def command_import(app: PyroClient, message: Message):
|
|
user = await app.find_user(message.from_user)
|
|
|
|
await message.reply_text(
|
|
app._("import", "messages", locale=user.locale),
|
|
reply_markup=ForceReply(placeholder=""),
|
|
)
|
|
|
|
while True:
|
|
answer = await listen_message(app, message.chat.id, 300)
|
|
|
|
if answer is None or answer.text == "/cancel":
|
|
await message.reply_text(
|
|
app._("cancelled", "messages", locale=user.locale),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
|
|
if answer.document is None or answer.document.mime_type != "application/json":
|
|
await answer.reply_text(
|
|
app._("import_invalid_filetype", "messages", locale=user.locale).format(
|
|
cancel_notice=app._("cancel", "messages", locale=user.locale)
|
|
)
|
|
)
|
|
continue
|
|
|
|
break
|
|
|
|
file = await app.download_media(answer, in_memory=True)
|
|
|
|
entries: List[Mapping[str, Union[str, int]]] = loads(bytes(file.getbuffer())) # type: ignore
|
|
|
|
for entry in entries:
|
|
if not isinstance(entries, list):
|
|
await answer.reply_text(
|
|
app._("import_invalid", "messages", locale=user.locale),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
|
|
for key in ("locations", "garbage_type", "date"):
|
|
if (
|
|
key not in entry
|
|
or (key == "garbage_type" and not isinstance(entry[key], int))
|
|
or (key == "locations" and not isinstance(entry[key], list))
|
|
):
|
|
await answer.reply_text(
|
|
app._("import_invalid", "messages", locale=user.locale),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
if key == "date":
|
|
try:
|
|
datetime.fromisoformat(str(entry[key]))
|
|
except (ValueError, TypeError):
|
|
await answer.reply_text(
|
|
app._("import_invalid_date", "messages", locale=user.locale),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|
|
return
|
|
|
|
entries_clean: List[Mapping[str, Union[str, int, datetime]]] = [
|
|
{
|
|
"locations": entry["locations"],
|
|
"garbage_type": entry["garbage_type"],
|
|
"date": datetime.fromisoformat(str(entry["date"])),
|
|
}
|
|
for entry in entries
|
|
]
|
|
|
|
await col_entries.insert_many(entries_clean)
|
|
|
|
await answer.reply_text(
|
|
app._("import_finished", "messages", locale=user.locale).format(
|
|
count=len(entries_clean)
|
|
),
|
|
reply_markup=ReplyKeyboardRemove(),
|
|
)
|