75 lines
2.5 KiB
Python
75 lines
2.5 KiB
Python
from datetime import datetime
|
|
from typing import List, Mapping, Union
|
|
|
|
from convopyro import listen_message
|
|
from pyrogram import filters
|
|
from pyrogram.types import Message
|
|
from ujson import loads
|
|
|
|
from classes.pyroclient import PyroClient
|
|
from modules import custom_filters
|
|
from modules.database import col_entries
|
|
|
|
|
|
@PyroClient.on_message(
|
|
~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) # type: ignore
|
|
)
|
|
async def command_import(app: PyroClient, message: Message):
|
|
await message.reply_text("Alright. Send me a valid JSON.")
|
|
|
|
while True:
|
|
answer = await listen_message(app, message.chat.id, 300)
|
|
|
|
if answer is None or answer.text == "/cancel":
|
|
await message.reply_text("Cancelled.")
|
|
return
|
|
|
|
if answer.document is None or answer.document.mime_type != "application/json":
|
|
await answer.reply_text(
|
|
"Invalid input. Please, send me a JSON file with entries."
|
|
)
|
|
continue
|
|
|
|
break
|
|
|
|
file = await app.download_media(answer, in_memory=True)
|
|
|
|
entries: List[Mapping[str, Union[str, int]]] = loads(bytes(file.getbuffer())) # type: ignore
|
|
|
|
for entry in entries:
|
|
if not isinstance(entries, list):
|
|
await answer.reply_text("This is not a valid garbage collection JSON.")
|
|
return
|
|
|
|
for key in ("locations", "garbage_type", "date"):
|
|
if (
|
|
key not in entry
|
|
or (key == "garbage_type" and not isinstance(entry[key], int))
|
|
or (key == "locations" and not isinstance(entry[key], list))
|
|
):
|
|
await answer.reply_text("This is not a valid garbage collection JSON.")
|
|
return
|
|
if key == "date":
|
|
try:
|
|
datetime.fromisoformat(str(entry[key]))
|
|
except (ValueError, TypeError):
|
|
await answer.reply_text(
|
|
"Entries contain invalid date formats. Use **ISO 8601** date format."
|
|
)
|
|
return
|
|
|
|
entries_clean: List[Mapping[str, Union[str, int, datetime]]] = [
|
|
{
|
|
"locations": entry["locations"],
|
|
"garbage_type": entry["garbage_type"],
|
|
"date": datetime.fromisoformat(str(entry["date"])),
|
|
}
|
|
for entry in entries
|
|
]
|
|
|
|
await col_entries.insert_many(entries_clean)
|
|
|
|
await answer.reply_text(
|
|
f"You have successfully inserted {len(entries_clean)} entries."
|
|
)
|