from datetime import datetime from typing import List, Mapping, Union from convopyro import listen_message from pyrogram import filters from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove from ujson import loads from classes.pyroclient import PyroClient from modules import custom_filters from modules.database import col_entries @PyroClient.on_message( ~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) & ~custom_filters.context # type: ignore ) async def command_import(app: PyroClient, message: Message): user = await app.find_user(message.from_user) await message.reply_text( app._("import", "messages", locale=user.locale), reply_markup=ForceReply( placeholder=app._("import", "force_replies", locale=user.locale) ), ) while True: app.contexts.append(message.from_user.id) answer = await listen_message(app, message.chat.id, 300) app.contexts.remove(message.from_user.id) if answer is None or answer.text == "/cancel": await message.reply_text( app._("cancelled", "messages", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return if answer.document is None or answer.document.mime_type != "application/json": await answer.reply_text( app._("import_invalid_filetype", "messages", locale=user.locale).format( cancel_notice=app._("cancel", "messages", locale=user.locale) ) ) continue break file = await app.download_media(answer, in_memory=True) entries: List[Mapping[str, Union[str, int]]] = loads(bytes(file.getbuffer())) # type: ignore for entry in entries: if not isinstance(entries, list): await answer.reply_text( app._("import_invalid", "messages", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return for key in ("locations", "garbage_type", "date"): if ( key not in entry or (key == "garbage_type" and not isinstance(entry[key], int)) or (key == "locations" and not isinstance(entry[key], list)) ): await answer.reply_text( app._("import_invalid", "messages", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return if key == "date": try: datetime.fromisoformat(str(entry[key])) except (ValueError, TypeError): await answer.reply_text( app._("import_invalid_date", "messages", locale=user.locale), reply_markup=ReplyKeyboardRemove(), ) return entries_clean: List[Mapping[str, Union[str, int, datetime]]] = [ { "locations": entry["locations"], "garbage_type": entry["garbage_type"], "date": datetime.fromisoformat(str(entry["date"])), } for entry in entries ] await col_entries.insert_many(entries_clean) await answer.reply_text( app._("import_finished", "messages", locale=user.locale).format( count=len(entries_clean) ), reply_markup=ReplyKeyboardRemove(), )