from os import path from typing import Union from fastapi import FastAPI, Security, HTTPException from starlette.status import HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN from fastapi.security import APIKeyQuery, APIKeyHeader, APIKeyCookie from fastapi.openapi.models import APIKey from fastapi.openapi.docs import get_swagger_ui_html, get_redoc_html from starlette.status import HTTP_401_UNAUTHORIZED from modules.utils import configGet, jsonLoad from modules.security import passEncode from modules.database import col_apikeys, col_expired app = FastAPI(title="Stardew Sync", docs_url=None, redoc_url=None, version="0.1") api_key_query = APIKeyQuery(name="apikey", auto_error=False) api_key_header = APIKeyHeader(name="apikey", auto_error=False) api_key_cookie = APIKeyCookie(name="apikey", auto_error=False) def get_all_api_keys() -> list: return jsonLoad(path.join(configGet("data", "locations"), "api_keys.json")) def get_all_expired_keys() -> list: return jsonLoad(path.join(configGet("data", "locations"), "expired_keys.json")) # def check_project_key(project: str, apikey: APIKey) -> bool: # keys = jsonLoad(path.join(configGet("data", "locations"), "api_keys.json")) # if apikey in keys: # if keys[apikey] != []: # if project in keys[apikey]: # return True # else: # return False # else: # return False # else: # return False async def get_api_key( api_key_query: str = Security(api_key_query), api_key_header: str = Security(api_key_header), api_key_cookie: str = Security(api_key_cookie), ) -> str: def is_valid(key): return True if col_apikeys.find_one({"hash": passEncode(key)}) is not None else False if is_valid(api_key_query): return api_key_query elif is_valid(api_key_header): return api_key_header elif is_valid(api_key_cookie): return api_key_cookie else: if (col_expired.find_one({"hash": passEncode(api_key_query)}) is not None) or (col_expired.find_one({"hash": passEncode(api_key_header)}) is not None) or (col_expired.find_one({"hash": passEncode(api_key_cookie)}) is not None): raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail=configGet("key_expired", "messages")) else: raise HTTPException(status_code=HTTP_401_UNAUTHORIZED, detail=configGet("key_invalid", "messages")) def user_by_key(apikey: Union[str, APIKey]) -> Union[str, None]: db_key = col_apikeys.find_one({"hash": passEncode(apikey)}) return db_key["user"] if db_key is not None else None @app.get("/docs", include_in_schema=False) async def custom_swagger_ui_html(): return get_swagger_ui_html( openapi_url=app.openapi_url, # type: ignore title=app.title + " - Documentation", swagger_favicon_url="/favicon.ico" ) @app.get("/redoc", include_in_schema=False) async def custom_redoc_html(): return get_redoc_html( openapi_url=app.openapi_url, # type: ignore title=app.title + " - Documentation", redoc_favicon_url="/favicon.ico" )