TelegramBot/classes/importer/json.py

57 lines
1.8 KiB
Python

from datetime import datetime
from typing import Any, Dict, List, Union
from bson import ObjectId
from ujson import loads
from classes.importer.abstract import Importer
from modules.database import col_entries
class ImporterJSON(Importer):
"""
The ImporterJSON class represents the object with
functionality to import/export garbage collection
records and convert them to other object types
from JSON files.
"""
def __init__(self):
super(Importer, self).__init__()
async def import_data(self, data: bytes) -> List[ObjectId]:
entries: List[Dict[str, Any]] = loads(data)
for entry in entries:
for key in ("locations", "garbage_type", "date"):
if (
key not in entry
or (key == "garbage_type" and not isinstance(entry[key], int))
or (key == "locations" and not isinstance(entry[key], list))
):
print("keys", entry)
raise ValueError
if key == "date":
try:
datetime.fromisoformat(str(entry[key]))
except (ValueError, TypeError) as exc:
print("date", entry)
raise ValueError from exc
entries_clean: List[Dict[str, Union[str, int, datetime]]] = [
{
"locations": entry["locations"],
"garbage_type": entry["garbage_type"],
"date": datetime.fromisoformat(str(entry["date"])),
}
for entry in entries
]
inserted = await col_entries.insert_many(entries_clean)
return [] if inserted is None else inserted.inserted_ids
async def export_data(self, data: Any) -> Any:
return None