Initial commit
This commit is contained in:
73
extensions/exceptions.py
Normal file
73
extensions/exceptions.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from fastapi import Request
|
||||
from fastapi.responses import UJSONResponse
|
||||
from starlette.status import (
|
||||
HTTP_400_BAD_REQUEST,
|
||||
HTTP_401_UNAUTHORIZED,
|
||||
HTTP_404_NOT_FOUND,
|
||||
HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
)
|
||||
|
||||
from classes.exceptions import (
|
||||
EntrySearchQueryEmptyError,
|
||||
LocationNotFoundError,
|
||||
LocationSearchQueryEmptyError,
|
||||
SearchLimitInvalidError,
|
||||
SearchPageInvalidError,
|
||||
)
|
||||
from modules.app import app
|
||||
|
||||
|
||||
@app.exception_handler(LocationNotFoundError)
|
||||
async def location_not_found_exception_handler(
|
||||
request: Request, exc: LocationNotFoundError
|
||||
):
|
||||
return UJSONResponse(
|
||||
status_code=HTTP_404_NOT_FOUND,
|
||||
content={"detail": f"Could not find location with id '{exc.id}'."},
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(EntrySearchQueryEmptyError)
|
||||
async def entry_search_query_empty_exception_handler(
|
||||
request: Request, exc: EntrySearchQueryEmptyError
|
||||
):
|
||||
return UJSONResponse(
|
||||
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
content={
|
||||
"detail": "You must provide location and date(s) to look for entries."
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(LocationSearchQueryEmptyError)
|
||||
async def location_search_query_empty_exception_handler(
|
||||
request: Request, exc: LocationSearchQueryEmptyError
|
||||
):
|
||||
return UJSONResponse(
|
||||
status_code=HTTP_422_UNPROCESSABLE_ENTITY,
|
||||
content={
|
||||
"detail": "You must provide name or coordinates to look for location."
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(SearchLimitInvalidError)
|
||||
async def search_limit_invalid_exception_handler(
|
||||
request: Request, exc: SearchLimitInvalidError
|
||||
):
|
||||
return UJSONResponse(
|
||||
status_code=HTTP_401_UNAUTHORIZED,
|
||||
content={"detail": "Parameter 'limit' must be greater or equal to 1."},
|
||||
)
|
||||
|
||||
|
||||
@app.exception_handler(SearchPageInvalidError)
|
||||
async def search_page_invalid_exception_handler(
|
||||
request: Request, exc: SearchPageInvalidError
|
||||
):
|
||||
return UJSONResponse(
|
||||
status_code=HTTP_400_BAD_REQUEST,
|
||||
content={
|
||||
"detail": "Parameters 'page' and 'page_size' must be greater or equal to 1."
|
||||
},
|
||||
)
|
169
extensions/locations.py
Normal file
169
extensions/locations.py
Normal file
@@ -0,0 +1,169 @@
|
||||
import re
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Literal, Union
|
||||
|
||||
from fastapi.responses import UJSONResponse
|
||||
from pymongo import ASCENDING
|
||||
|
||||
from classes.exceptions import (
|
||||
EntrySearchQueryEmptyError,
|
||||
LocationNotFoundError,
|
||||
LocationSearchQueryEmptyError,
|
||||
SearchPageInvalidError,
|
||||
)
|
||||
from classes.models import Location, SearchResultsCollectionEntry, SearchResultsLocation
|
||||
from modules.app import app
|
||||
from modules.database import col_entries, col_locations
|
||||
|
||||
location_get_responses = {
|
||||
404: LocationNotFoundError(0).openapi,
|
||||
}
|
||||
|
||||
|
||||
@app.get(
|
||||
"/locations/{id}",
|
||||
description="Get a location by id",
|
||||
response_model=Location,
|
||||
response_class=UJSONResponse,
|
||||
responses=location_get_responses, # type: ignore
|
||||
)
|
||||
async def location_get(
|
||||
id: int,
|
||||
):
|
||||
location = await col_locations.find_one({"id": id})
|
||||
|
||||
if location is None:
|
||||
raise LocationNotFoundError(id)
|
||||
|
||||
del location["_id"] # type: ignore
|
||||
|
||||
return UJSONResponse(location)
|
||||
|
||||
|
||||
location_find_responses = {
|
||||
400: SearchPageInvalidError().openapi,
|
||||
422: LocationSearchQueryEmptyError().openapi,
|
||||
}
|
||||
|
||||
|
||||
@app.get(
|
||||
"/locations",
|
||||
description="Find a location by name or coordinates",
|
||||
response_class=UJSONResponse,
|
||||
response_model=SearchResultsLocation,
|
||||
responses=location_find_responses, # type: ignore
|
||||
)
|
||||
async def location_find(
|
||||
q: Union[str, None] = None,
|
||||
page: int = 1,
|
||||
page_size: int = 100,
|
||||
lat: Union[float, None] = None,
|
||||
lng: Union[float, None] = None,
|
||||
radius: Union[int, None] = None,
|
||||
):
|
||||
if page <= 0 or page_size <= 0:
|
||||
raise SearchPageInvalidError()
|
||||
|
||||
output = {"results": []}
|
||||
skip = (page - 1) * page_size
|
||||
|
||||
radius = 5000 if radius is None else radius
|
||||
|
||||
if (lat is not None) and (lng is not None):
|
||||
db_query = {
|
||||
"location": {
|
||||
"$nearSphere": {
|
||||
"$geometry": {"type": "Point", "coordinates": [lng, lat]},
|
||||
"$maxDistance": radius,
|
||||
}
|
||||
},
|
||||
}
|
||||
elif q is not None:
|
||||
db_query = {"name": re.compile(q)}
|
||||
else:
|
||||
raise LocationSearchQueryEmptyError()
|
||||
|
||||
locations = [
|
||||
location
|
||||
async for location in (col_locations.find(db_query, limit=page_size, skip=skip))
|
||||
]
|
||||
|
||||
for location in locations:
|
||||
output["results"].append(
|
||||
{
|
||||
"id": location["id"],
|
||||
"name": location["name"],
|
||||
"location": location["location"],
|
||||
"country": location["country"],
|
||||
"timezone": location["timezone"],
|
||||
}
|
||||
)
|
||||
|
||||
return UJSONResponse(output)
|
||||
|
||||
|
||||
entry_find_responses = {
|
||||
400: SearchPageInvalidError().openapi,
|
||||
404: LocationNotFoundError(0).openapi,
|
||||
422: LocationSearchQueryEmptyError().openapi,
|
||||
}
|
||||
|
||||
|
||||
@app.get(
|
||||
"/locations/{location}/entries",
|
||||
description="Find entries by date(s) or type",
|
||||
response_class=UJSONResponse,
|
||||
response_model=SearchResultsCollectionEntry,
|
||||
responses=entry_find_responses, # type: ignore
|
||||
)
|
||||
async def entry_find(
|
||||
location: int,
|
||||
garbage_type: Union[Literal[0, 1, 2, 3, 4, 5], None] = None,
|
||||
date_start: str = datetime.now().isoformat(),
|
||||
date_end: str = (datetime.now() + timedelta(days=30)).isoformat(),
|
||||
page: int = 1,
|
||||
page_size: int = 100,
|
||||
):
|
||||
if (await col_locations.find_one({"id": location})) is None:
|
||||
raise LocationNotFoundError(location)
|
||||
|
||||
if page <= 0 or page_size <= 0:
|
||||
raise SearchPageInvalidError()
|
||||
|
||||
output = {"results": []}
|
||||
skip = (page - 1) * page_size
|
||||
|
||||
date_start_dt = datetime.fromisoformat(date_start)
|
||||
date_end_dt = datetime.fromisoformat(date_end)
|
||||
|
||||
if garbage_type is None and date_start_dt is None and date_end_dt is None:
|
||||
raise EntrySearchQueryEmptyError()
|
||||
elif garbage_type is None:
|
||||
db_query = {
|
||||
"locations": location,
|
||||
"date": {"$gte": date_start_dt, "$lte": date_end_dt},
|
||||
}
|
||||
else:
|
||||
db_query = {
|
||||
"locations": location,
|
||||
"garbage_type": garbage_type,
|
||||
"date": {"$gte": date_start_dt, "$lte": date_end_dt},
|
||||
}
|
||||
|
||||
entries = [
|
||||
entry
|
||||
async for entry in col_entries.find(db_query, limit=page_size, skip=skip).sort(
|
||||
key="date", direction=ASCENDING
|
||||
)
|
||||
]
|
||||
|
||||
for entry in entries:
|
||||
output["results"].append(
|
||||
{
|
||||
"locations": entry["locations"],
|
||||
"garbage_type": entry["garbage_type"],
|
||||
"date": entry["date"].isoformat(),
|
||||
}
|
||||
)
|
||||
|
||||
return UJSONResponse(output)
|
Reference in New Issue
Block a user