This repository has been archived on 2024-10-10. You can view files and clone it, but cannot push or open issues or pull requests.

99 lines
3.4 KiB
Python

from datetime import datetime
from typing import List, Mapping, Union
from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
from ujson import loads
from classes.pyroclient import PyroClient
from modules import custom_filters
from modules.database import col_entries
@PyroClient.on_message(
~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) & ~custom_filters.context # type: ignore
)
async def command_import(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
app._("import", "messages", locale=user.locale),
reply_markup=ForceReply(
placeholder=app._("import", "force_replies", locale=user.locale)
),
)
while True:
app.contexts.append(message.from_user.id)
answer = await listen_message(app, message.chat.id, 300)
app.contexts.remove(message.from_user.id)
if answer is None or answer.text == "/cancel":
await message.reply_text(
app._("cancelled", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
if answer.document is None or answer.document.mime_type != "application/json":
await answer.reply_text(
app._("import_invalid_filetype", "messages", locale=user.locale).format(
cancel_notice=app._("cancel", "messages", locale=user.locale)
)
)
continue
break
file = await app.download_media(answer, in_memory=True)
entries: List[Mapping[str, Union[str, int]]] = loads(bytes(file.getbuffer())) # type: ignore
for entry in entries:
if not isinstance(entries, list):
await answer.reply_text(
app._("import_invalid", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
for key in ("locations", "garbage_type", "date"):
if (
key not in entry
or (key == "garbage_type" and not isinstance(entry[key], int))
or (key == "locations" and not isinstance(entry[key], list))
):
await answer.reply_text(
app._("import_invalid", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
if key == "date":
try:
datetime.fromisoformat(str(entry[key]))
except (ValueError, TypeError):
await answer.reply_text(
app._("import_invalid_date", "messages", locale=user.locale),
reply_markup=ReplyKeyboardRemove(),
)
return
entries_clean: List[Mapping[str, Union[str, int, datetime]]] = [
{
"locations": entry["locations"],
"garbage_type": entry["garbage_type"],
"date": datetime.fromisoformat(str(entry["date"])),
}
for entry in entries
]
await col_entries.insert_many(entries_clean)
await answer.reply_text(
app._("import_finished", "messages", locale=user.locale).format(
count=len(entries_clean)
),
reply_markup=ReplyKeyboardRemove(),
)