import re from datetime import datetime, timedelta from typing import Literal, Union from fastapi.responses import UJSONResponse from pymongo import ASCENDING from classes.exceptions import ( EntrySearchQueryEmptyError, LocationNotFoundError, LocationSearchQueryEmptyError, SearchPageInvalidError, ) from classes.models import Location, SearchResultsCollectionEntry, SearchResultsLocation from import app from modules.database import col_entries, col_locations location_get_responses = { 404: LocationNotFoundError(0).openapi, } @app.get( "/locations/{id}", description="Get a location by id", response_model=Location, response_class=UJSONResponse, responses=location_get_responses, # type: ignore ) async def location_get( id: int, ): location = await col_locations.find_one({"id": id}) if location is None: raise LocationNotFoundError(id) del location["_id"] # type: ignore return UJSONResponse(location) location_find_responses = { 400: SearchPageInvalidError().openapi, 422: LocationSearchQueryEmptyError().openapi, } @app.get( "/locations", description="Find a location by name or coordinates", response_class=UJSONResponse, response_model=SearchResultsLocation, responses=location_find_responses, # type: ignore ) async def location_find( q: Union[str, None] = None, page: int = 1, page_size: int = 100, lat: Union[float, None] = None, lng: Union[float, None] = None, radius: Union[int, None] = None, ): if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} skip = (page - 1) * page_size radius = 5000 if radius is None else radius if (lat is not None) and (lng is not None): db_query = { "location": { "$nearSphere": { "$geometry": {"type": "Point", "coordinates": [lng, lat]}, "$maxDistance": radius, } }, } elif q is not None: db_query = {"name": re.compile(q)} else: raise LocationSearchQueryEmptyError() locations = [ location async for location in (col_locations.find(db_query, limit=page_size, skip=skip)) ] for location in locations: output["results"].append( { "id": location["id"], "name": location["name"], "location": location["location"], "country": location["country"], "timezone": location["timezone"], } ) return UJSONResponse(output) entry_find_responses = { 400: SearchPageInvalidError().openapi, 404: LocationNotFoundError(0).openapi, 422: LocationSearchQueryEmptyError().openapi, } @app.get( "/locations/{location}/entries", description="Find entries by date(s) or type", response_class=UJSONResponse, response_model=SearchResultsCollectionEntry, responses=entry_find_responses, # type: ignore ) async def entry_find( location: int, garbage_type: Union[Literal[0, 1, 2, 3, 4, 5], None] = None, date_start: str =, date_end: str = ( + timedelta(days=30)).isoformat(), page: int = 1, page_size: int = 100, ): if (await col_locations.find_one({"id": location})) is None: raise LocationNotFoundError(location) if page <= 0 or page_size <= 0: raise SearchPageInvalidError() output = {"results": []} skip = (page - 1) * page_size date_start_dt = datetime.fromisoformat(date_start) date_end_dt = datetime.fromisoformat(date_end) if garbage_type is None and date_start_dt is None and date_end_dt is None: raise EntrySearchQueryEmptyError() elif garbage_type is None: db_query = { "locations": location, "date": {"$gte": date_start_dt, "$lte": date_end_dt}, } else: db_query = { "locations": location, "garbage_type": garbage_type, "date": {"$gte": date_start_dt, "$lte": date_end_dt}, } entries = [ entry async for entry in col_entries.find(db_query, limit=page_size, skip=skip).sort( key="date", direction=ASCENDING ) ] for entry in entries: output["results"].append( { "locations": entry["locations"], "garbage_type": entry["garbage_type"], "date": entry["date"].isoformat(), } ) return UJSONResponse(output)