from datetime import datetime, timezone from urllib.parse import quote_plus from os import remove from typing import Dict, List, Literal, Union from xmltodict import parse from pymongo import DESCENDING from models.saves import StardewSave, StardewSaveBrief from modules.app import app, get_api_key, user_by_key from modules.database import col_devices, col_saves from fastapi import HTTPException, Depends, UploadFile from fastapi.responses import UJSONResponse, FileResponse, Response from fastapi.openapi.models import APIKey from starlette.status import HTTP_204_NO_CONTENT, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE from modules.utils import configGet, zip_saves @app.get("/saves", response_class=UJSONResponse, response_model=Union[List[Dict[str, StardewSave]], List[StardewSaveBrief]], description="Get all available game saves") async def saves_get(device: Union[str, None] = None, version: Union[str, None] = None, only_ids: bool = False, apikey: APIKey = Depends(get_api_key)): user = user_by_key(apikey) query = {"user": user} if device is not None: query["device"] = device if version is not None: query["data.game_version"] = version saves_entries = list(col_saves.find(query).sort("date", DESCENDING)) if len(saves_entries) == 0: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find any saves.") output = [] added = [] for entry in saves_entries: out_entry = entry del out_entry["_id"] del out_entry["user"] del out_entry["file"] if only_ids is True: if entry["id"] in added: continue else: added.append(entry["id"]) output.append(out_entry) return UJSONResponse(output) @app.get("/saves/{id}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name") async def saves_get_by_id(id: int, device: Union[str, None] = None, version: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)): query = {"id": id} if device is not None: query["device"] = device if version is not None: query["data.game_version"] = version saves_entries = list(col_saves.find(query).sort("date", DESCENDING)) if len(saves_entries) == 0: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") output = [] for entry in saves_entries: out_entry = entry del out_entry["_id"] del out_entry["file"] del out_entry["user"] output.append(out_entry) return UJSONResponse(output) @app.get("/saves/{id}/{save_date}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by id and upload date") async def saves_get_by_both_ids(id: int, save_date: Union[int, Literal["latest"]], device: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)): query = {"user": user_by_key(apikey), "id": id, "date": save_date} if device is not None: query["device"] = device saves_entry = col_saves.find_one(query) if saves_entry is not None: del saves_entry["_id"] del saves_entry["file"] return UJSONResponse(saves_entry) else: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") @app.delete("/saves/{id}", description="Delete game saves by id") async def saves_delete_by_id(id: int, apikey: APIKey = Depends(get_api_key)): user = user_by_key(apikey) if col_saves.count_documents({"user": user, "id": id}) > 0: saves_entries = list(col_saves.find({"user": user, "id": id})) for entry in saves_entries: remove(entry["file"]["path"]) col_saves.delete_many({"user": user, "id": id}) return Response(status_code=HTTP_204_NO_CONTENT) else: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") @app.delete("/saves/{id}/{save_date}", response_class=UJSONResponse, description="Delete game saves by id and upload date") async def saves_delete_by_both_ids(id: int, save_date: int, apikey: APIKey = Depends(get_api_key)): saves_entry = col_saves.find_one_and_delete({"id": id, "date": save_date}) if saves_entry is not None: remove(saves_entry["file"]["path"]) return Response(status_code=HTTP_204_NO_CONTENT) else: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") @app.get("/saves/{id}/{save_date}/download", response_class=FileResponse, description="Get game save as .svsave file by its id and upload date") async def saves_download(id: int, save_date: int, device: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)): saves_entry = col_saves.find_one({"user": user_by_key(apikey), "id": id, "date": save_date}) if device is None else col_saves.find_one({"user": user_by_key(apikey), "id": id, "device": device, "date": save_date}) if saves_entry is not None: # type: ignore with open(saves_entry["file"]["path"], "rb") as file: response = Response( file.read(), media_type="application/x-zip-compressed", headers={ 'Content-Disposition': f'attachment;filename={quote_plus(saves_entry["file"]["name"])}.svsave' } ) return response else: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") @app.post("/saves", response_class=UJSONResponse, response_model=StardewSave, description="Upload new save") async def saves_post(device: str, files: List[UploadFile], apikey: APIKey = Depends(get_api_key)): user = user_by_key(apikey) error_return = HTTPException(HTTP_406_NOT_ACCEPTABLE, detail="You must provide two files: save file and SaveGameInfo for that save") if (configGet("saves", "limits") != -1) and (col_saves.count_documents({"user": user}) >= configGet("saves", "limits")): return UJSONResponse( { "detail": f'Too many save files. This instance allows to store only {configGet("saves", "limits")} saves per user', "limit": configGet("saves", "limits") }, status_code=HTTP_403_FORBIDDEN ) if len(files) != 2: return error_return if "SaveGameInfo" not in [file.filename for file in files]: return error_return if col_devices.find_one({"user": user, "name": device}) is None: raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find device with that name.") save_info = save_data = save_info_file = save_data_filename = save_data_file = None for file in files: if file.filename == "SaveGameInfo": save_info_file = await file.read() save_info = parse(save_info_file.decode("utf-8")) if "Farmer" not in save_info: return error_return else: save_data_filename = file.filename save_data_file = await file.read() save_data = parse(save_data_file.decode("utf-8")) if "SaveGame" not in save_data: return error_return if save_info is None or save_data is None or save_info_file is None or save_data_filename is None or save_data_file is None: return error_return zipped = zip_saves(save_data_filename, save_data_file, save_info_file) save_date = int(datetime.now(tz=timezone.utc).timestamp()) index = { "id": int(save_data["SaveGame"]["uniqueIDForThisGame"]), "user": user, "device": device, "date": save_date, "data": { "farmer": save_info["Farmer"]["name"], "money": int(save_info["Farmer"]["money"]), "played": int(save_info["Farmer"]["millisecondsPlayed"]), "save_time": int(save_info["Farmer"]["saveTime"]), "year": int(save_info["Farmer"]["yearForSaveGame"]), "season": int(save_info["Farmer"]["seasonForSaveGame"]), "day": int(save_info["Farmer"]["dayOfMonthForSaveGame"]), "game_version": save_info["Farmer"]["gameVersion"] }, "file": { "name": save_data_filename, "uuid": zipped[0], "path": zipped[1] } } col_saves.insert_one(index) del save_info, save_data del index["user"] del index["file"] del index["_id"] col_devices.find_one_and_update({"user": user, "name": device}, {"$set": {"last_save": save_date}}) return UJSONResponse(index)