from datetime import datetime from io import BytesIO from os import listdir, makedirs, path, sep from typing import Dict, List from zipfile import ZipFile from xmltodict import parse from models.saves import StardewSave from modules.app import app, get_api_key from modules.utils import configGet, jsonLoad, jsonSave from fastapi import HTTPException, Depends, UploadFile from fastapi.responses import UJSONResponse, FileResponse, Response from fastapi.openapi.models import APIKey from starlette.status import HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE def zipfiles(filenames, save_name: str) -> Response: zip_filename = save_name+".svsave" s = BytesIO() zf = ZipFile(s, "w") for fpath in filenames: # Calculate path for file in zip fdir, fname = path.split(fpath) # Add file, at correct path zf.write(fpath, fname) # Must close zip for all contents to be written zf.close() # Grab ZIP file from in-memory, make response with correct MIME-type return Response( s.getvalue(), media_type="application/x-zip-compressed", headers={ 'Content-Disposition': f'attachment;filename={zip_filename}' } ) @app.get("/saves", response_class=UJSONResponse, response_model=Dict[str, StardewSave], description="Get all available game saves") async def saves_get(apikey: APIKey = Depends(get_api_key)): save_path = path.join(configGet("data", "locations"), "users", apikey) # type: ignore if path.exists(save_path): output = {} for id in listdir(save_path): print("Iterating through", save_path, "on", id) for dir in listdir(path.join(save_path, id)): print("Iterating through", path.join(save_path, id, dir), "on", dir) d = path.join(save_path, id, dir) if path.isdir(d): if str(id) not in output: output[str(id)] = [] output[str(id)].append(jsonLoad(path.join(configGet("data", "locations"), "users", apikey, id, dir, "index.json"))) # type: ignore return UJSONResponse(output) else: return HTTPException(HTTP_404_NOT_FOUND, detail="Could not find any saves.") @app.get("/saves/{id}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name") async def saves_get_by_id(id: str, apikey: APIKey = Depends(get_api_key)): save_path = path.join(configGet("data", "locations"), "users", apikey, id) # type: ignore if path.exists(save_path): output = [] for dir in listdir(save_path): d = path.join(save_path, dir) if path.isdir(d): output.append(jsonLoad(path.join(configGet("data", "locations"), "users", apikey, id, dir, "index.json"))) # type: ignore return UJSONResponse(output) else: return HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") @app.get("/saves/{id}/download", response_class=FileResponse, description="Get game save as .svsave file by its id and save date") async def saves_download(id: str, save_date: str, apikey: APIKey = Depends(get_api_key)): if path.exists(path.join(configGet("data", "locations"), "users", apikey, id, save_date)): # type: ignore save_path = path.join(configGet("data", "locations"), "users", apikey, id, save_date) # type: ignore return zipfiles([f"{save_path}{sep}{id}.txt", f"{save_path}{sep}SaveGameInfo", f"{save_path}{sep}index.json"], save_name=f"{id}_{save_date}") else: return HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.") @app.post("/saves", response_class=UJSONResponse, response_model=StardewSave, description="Upload new save") async def saves_post(device: str, files: List[UploadFile], apikey: APIKey = Depends(get_api_key)): error_return = HTTPException(HTTP_406_NOT_ACCEPTABLE, detail="You must provide two files: save file and SaveGameInfo for that save") if len(files) != 2: return error_return if "SaveGameInfo" not in [file.filename for file in files]: return error_return save_info = save_data = save_info_file = save_data_file = None for file in files: if file.filename == "SaveGameInfo": save_info_file = await file.read() save_info = parse(save_info_file.decode("utf-8")) if "Farmer" not in save_info: return error_return else: save_data_file = await file.read() save_data = parse(save_data_file.decode("utf-8")) if "SaveGame" not in save_data: return error_return if save_info is None or save_data is None or save_info_file is None or save_data_file is None: return error_return now = datetime.now() save_date = now.strftime("%Y%m%d_%H%M%S") makedirs(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date), exist_ok=True) # type: ignore with open(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, save_data["SaveGame"]["uniqueIDForThisGame"]+".txt"), "wb") as f: # type: ignore f.write(save_info_file) with open(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, "SaveGameInfo"), "wb") as f: # type: ignore f.write(save_data_file) index = { "id": int(save_data["SaveGame"]["uniqueIDForThisGame"]), "device": device, "farmer": save_info["Farmer"]["name"], "year": int(save_info["Farmer"]["yearForSaveGame"]), "season": int(save_info["Farmer"]["seasonForSaveGame"]), "day": int(save_info["Farmer"]["dayOfMonthForSaveGame"]), "money": int(save_info["Farmer"]["money"]), "played": int(save_info["Farmer"]["millisecondsPlayed"]), "save_time": int(save_info["Farmer"]["saveTime"]), "save_date": save_date, "uploaded": now.isoformat() } jsonSave(index, path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, "index.json")) # type: ignore return UJSONResponse(index)