TelegramBot/classes/importer/json.py

57 lines
1.8 KiB
Python

from datetime import datetime
from typing import Any, Dict, List, Union
from bson import ObjectId
from ujson import loads
from classes.importer.abstract import Importer
from modules.database_api import col_entries
class ImporterJSON(Importer):
"""
The ImporterJSON class represents the object with
functionality to import/export garbage collection
records and convert them to other object types
from JSON files.
"""
def __init__(self):
super(Importer, self).__init__()
async def import_data(self, data: bytes) -> List[ObjectId]:
entries: List[Dict[str, Any]] = loads(data)
for entry in entries:
for key in ("locations", "garbage_type", "date"):
if (
key not in entry
or (key == "garbage_type" and not isinstance(entry[key], int))
or (key == "locations" and not isinstance(entry[key], list))
):
print("keys", entry)
raise ValueError
if key == "date":
try:
datetime.fromisoformat(str(entry[key]))
except (ValueError, TypeError) as exc:
print("date", entry)
raise ValueError from exc
entries_clean: List[Dict[str, Union[str, int, datetime]]] = [
{
"locations": entry["locations"],
"garbage_type": entry["garbage_type"],
"date": datetime.fromisoformat(str(entry["date"])),
}
for entry in entries
]
inserted = await col_entries.insert_many(entries_clean)
return [] if inserted is None else inserted.inserted_ids
async def export_data(self, data: Any) -> Any:
return None