API/extensions/locations.py
2023-12-14 01:18:57 +01:00

170 lines
4.5 KiB
Python

import re
from datetime import datetime, timedelta
from typing import Literal, Union
from fastapi.responses import UJSONResponse
from pymongo import ASCENDING
from classes.exceptions import (
EntrySearchQueryEmptyError,
LocationNotFoundError,
LocationSearchQueryEmptyError,
SearchPageInvalidError,
)
from classes.models import Location, SearchResultsCollectionEntry, SearchResultsLocation
from modules.app import app
from modules.database import col_entries, col_locations
location_get_responses = {
404: LocationNotFoundError(0).openapi,
}
@app.get(
"/locations/{id}",
description="Get a location by id",
response_model=Location,
response_class=UJSONResponse,
responses=location_get_responses, # type: ignore
)
async def location_get(
id: int,
):
location = await col_locations.find_one({"id": id})
if location is None:
raise LocationNotFoundError(id)
del location["_id"] # type: ignore
return UJSONResponse(location)
location_find_responses = {
400: SearchPageInvalidError().openapi,
422: LocationSearchQueryEmptyError().openapi,
}
@app.get(
"/locations",
description="Find a location by name or coordinates",
response_class=UJSONResponse,
response_model=SearchResultsLocation,
responses=location_find_responses, # type: ignore
)
async def location_find(
q: Union[str, None] = None,
page: int = 1,
page_size: int = 100,
lat: Union[float, None] = None,
lng: Union[float, None] = None,
radius: Union[int, None] = None,
):
if page <= 0 or page_size <= 0:
raise SearchPageInvalidError()
output = {"results": []}
skip = (page - 1) * page_size
radius = 5000 if radius is None else radius
if (lat is not None) and (lng is not None):
db_query = {
"location": {
"$nearSphere": {
"$geometry": {"type": "Point", "coordinates": [lng, lat]},
"$maxDistance": radius,
}
},
}
elif q is not None:
db_query = {"name": re.compile(q)}
else:
raise LocationSearchQueryEmptyError()
locations = [
location
async for location in (col_locations.find(db_query, limit=page_size, skip=skip))
]
for location in locations:
output["results"].append(
{
"id": location["id"],
"name": location["name"],
"location": location["location"],
"country": location["country"],
"timezone": location["timezone"],
}
)
return UJSONResponse(output)
entry_find_responses = {
400: SearchPageInvalidError().openapi,
404: LocationNotFoundError(0).openapi,
422: LocationSearchQueryEmptyError().openapi,
}
@app.get(
"/locations/{location}/entries",
description="Find entries by date(s) or type",
response_class=UJSONResponse,
response_model=SearchResultsCollectionEntry,
responses=entry_find_responses, # type: ignore
)
async def entry_find(
location: int,
garbage_type: Union[Literal[0, 1, 2, 3, 4, 5], None] = None,
date_start: str = datetime.now().isoformat(),
date_end: str = (datetime.now() + timedelta(days=30)).isoformat(),
page: int = 1,
page_size: int = 100,
):
if (await col_locations.find_one({"id": location})) is None:
raise LocationNotFoundError(location)
if page <= 0 or page_size <= 0:
raise SearchPageInvalidError()
output = {"results": []}
skip = (page - 1) * page_size
date_start_dt = datetime.fromisoformat(date_start)
date_end_dt = datetime.fromisoformat(date_end)
if garbage_type is None and date_start_dt is None and date_end_dt is None:
raise EntrySearchQueryEmptyError()
elif garbage_type is None:
db_query = {
"locations": location,
"date": {"$gte": date_start_dt, "$lte": date_end_dt},
}
else:
db_query = {
"locations": location,
"garbage_type": garbage_type,
"date": {"$gte": date_start_dt, "$lte": date_end_dt},
}
entries = [
entry
async for entry in col_entries.find(db_query, limit=page_size, skip=skip).sort(
key="date", direction=ASCENDING
)
]
for entry in entries:
output["results"].append(
{
"locations": entry["locations"],
"garbage_type": entry["garbage_type"],
"date": entry["date"].isoformat(),
}
)
return UJSONResponse(output)