170 lines
4.5 KiB
Python
170 lines
4.5 KiB
Python
import re
|
|
from datetime import datetime, timedelta
|
|
from typing import Literal, Union
|
|
|
|
from fastapi.responses import UJSONResponse
|
|
from pymongo import ASCENDING
|
|
|
|
from classes.exceptions import (
|
|
EntrySearchQueryEmptyError,
|
|
LocationNotFoundError,
|
|
LocationSearchQueryEmptyError,
|
|
SearchPageInvalidError,
|
|
)
|
|
from classes.models import Location, SearchResultsCollectionEntry, SearchResultsLocation
|
|
from modules.app import app
|
|
from modules.database import col_entries, col_locations
|
|
|
|
location_get_responses = {
|
|
404: LocationNotFoundError(0).openapi,
|
|
}
|
|
|
|
|
|
@app.get(
|
|
"/locations/{id}",
|
|
description="Get a location by id",
|
|
response_model=Location,
|
|
response_class=UJSONResponse,
|
|
responses=location_get_responses, # type: ignore
|
|
)
|
|
async def location_get(
|
|
id: int,
|
|
):
|
|
location = await col_locations.find_one({"id": id})
|
|
|
|
if location is None:
|
|
raise LocationNotFoundError(id)
|
|
|
|
del location["_id"] # type: ignore
|
|
|
|
return UJSONResponse(location)
|
|
|
|
|
|
location_find_responses = {
|
|
400: SearchPageInvalidError().openapi,
|
|
422: LocationSearchQueryEmptyError().openapi,
|
|
}
|
|
|
|
|
|
@app.get(
|
|
"/locations",
|
|
description="Find a location by name or coordinates",
|
|
response_class=UJSONResponse,
|
|
response_model=SearchResultsLocation,
|
|
responses=location_find_responses, # type: ignore
|
|
)
|
|
async def location_find(
|
|
q: Union[str, None] = None,
|
|
page: int = 1,
|
|
page_size: int = 100,
|
|
lat: Union[float, None] = None,
|
|
lng: Union[float, None] = None,
|
|
radius: Union[int, None] = None,
|
|
):
|
|
if page <= 0 or page_size <= 0:
|
|
raise SearchPageInvalidError()
|
|
|
|
output = {"results": []}
|
|
skip = (page - 1) * page_size
|
|
|
|
radius = 5000 if radius is None else radius
|
|
|
|
if (lat is not None) and (lng is not None):
|
|
db_query = {
|
|
"location": {
|
|
"$nearSphere": {
|
|
"$geometry": {"type": "Point", "coordinates": [lng, lat]},
|
|
"$maxDistance": radius,
|
|
}
|
|
},
|
|
}
|
|
elif q is not None:
|
|
db_query = {"name": re.compile(q)}
|
|
else:
|
|
raise LocationSearchQueryEmptyError()
|
|
|
|
locations = [
|
|
location
|
|
async for location in (col_locations.find(db_query, limit=page_size, skip=skip))
|
|
]
|
|
|
|
for location in locations:
|
|
output["results"].append(
|
|
{
|
|
"id": location["id"],
|
|
"name": location["name"],
|
|
"location": location["location"],
|
|
"country": location["country"],
|
|
"timezone": location["timezone"],
|
|
}
|
|
)
|
|
|
|
return UJSONResponse(output)
|
|
|
|
|
|
entry_find_responses = {
|
|
400: SearchPageInvalidError().openapi,
|
|
404: LocationNotFoundError(0).openapi,
|
|
422: LocationSearchQueryEmptyError().openapi,
|
|
}
|
|
|
|
|
|
@app.get(
|
|
"/locations/{location}/entries",
|
|
description="Find entries by date(s) or type",
|
|
response_class=UJSONResponse,
|
|
response_model=SearchResultsCollectionEntry,
|
|
responses=entry_find_responses, # type: ignore
|
|
)
|
|
async def entry_find(
|
|
location: int,
|
|
garbage_type: Union[Literal[0, 1, 2, 3, 4, 5], None] = None,
|
|
date_start: str = datetime.now().isoformat(),
|
|
date_end: str = (datetime.now() + timedelta(days=30)).isoformat(),
|
|
page: int = 1,
|
|
page_size: int = 100,
|
|
):
|
|
if (await col_locations.find_one({"id": location})) is None:
|
|
raise LocationNotFoundError(location)
|
|
|
|
if page <= 0 or page_size <= 0:
|
|
raise SearchPageInvalidError()
|
|
|
|
output = {"results": []}
|
|
skip = (page - 1) * page_size
|
|
|
|
date_start_dt = datetime.fromisoformat(date_start)
|
|
date_end_dt = datetime.fromisoformat(date_end)
|
|
|
|
if garbage_type is None and date_start_dt is None and date_end_dt is None:
|
|
raise EntrySearchQueryEmptyError()
|
|
elif garbage_type is None:
|
|
db_query = {
|
|
"locations": location,
|
|
"date": {"$gte": date_start_dt, "$lte": date_end_dt},
|
|
}
|
|
else:
|
|
db_query = {
|
|
"locations": location,
|
|
"garbage_type": garbage_type,
|
|
"date": {"$gte": date_start_dt, "$lte": date_end_dt},
|
|
}
|
|
|
|
entries = [
|
|
entry
|
|
async for entry in col_entries.find(db_query, limit=page_size, skip=skip).sort(
|
|
key="date", direction=ASCENDING
|
|
)
|
|
]
|
|
|
|
for entry in entries:
|
|
output["results"].append(
|
|
{
|
|
"locations": entry["locations"],
|
|
"garbage_type": entry["garbage_type"],
|
|
"date": entry["date"].isoformat(),
|
|
}
|
|
)
|
|
|
|
return UJSONResponse(output)
|