SyncAPI/extensions/saves.py
2023-01-16 15:33:42 +01:00

141 lines
6.2 KiB
Python

from datetime import datetime
from io import BytesIO
from os import listdir, makedirs, path, sep
from typing import Dict, List
from zipfile import ZipFile
from xmltodict import parse
from models.saves import StardewSave
from modules.app import app, get_api_key
from modules.utils import configGet, jsonLoad, jsonSave
from fastapi import HTTPException, Depends, UploadFile
from fastapi.responses import UJSONResponse, FileResponse, Response
from fastapi.openapi.models import APIKey
from starlette.status import HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE
def zipfiles(filenames, save_name: str) -> Response:
zip_filename = save_name+".svsave"
s = BytesIO()
zf = ZipFile(s, "w")
for fpath in filenames:
# Calculate path for file in zip
fdir, fname = path.split(fpath)
# Add file, at correct path
zf.write(fpath, fname)
# Must close zip for all contents to be written
zf.close()
# Grab ZIP file from in-memory, make response with correct MIME-type
return Response(
s.getvalue(),
media_type="application/x-zip-compressed",
headers={
'Content-Disposition': f'attachment;filename={zip_filename}'
}
)
@app.get("/saves", response_class=UJSONResponse, response_model=Dict[str, StardewSave], description="Get all available game saves")
async def saves_get(apikey: APIKey = Depends(get_api_key)):
save_path = path.join(configGet("data", "locations"), "users", apikey) # type: ignore
if path.exists(save_path):
output = {}
for id in listdir(save_path):
print("Iterating through", save_path, "on", id)
for dir in listdir(path.join(save_path, id)):
print("Iterating through", path.join(save_path, id, dir), "on", dir)
d = path.join(save_path, id, dir)
if path.isdir(d):
if str(id) not in output:
output[str(id)] = []
output[str(id)].append(jsonLoad(path.join(configGet("data", "locations"), "users", apikey, id, dir, "index.json"))) # type: ignore
return UJSONResponse(output)
else:
return HTTPException(HTTP_404_NOT_FOUND, detail="Could not find any saves.")
@app.get("/saves/{id}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
async def saves_get_by_id(id: str, apikey: APIKey = Depends(get_api_key)):
save_path = path.join(configGet("data", "locations"), "users", apikey, id) # type: ignore
if path.exists(save_path):
output = []
for dir in listdir(save_path):
d = path.join(save_path, dir)
if path.isdir(d):
output.append(jsonLoad(path.join(configGet("data", "locations"), "users", apikey, id, dir, "index.json"))) # type: ignore
return UJSONResponse(output)
else:
return HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.get("/saves/{id}/download", response_class=FileResponse, description="Get game save as .svsave file by its id and save date")
async def saves_download(id: str, save_date: str, apikey: APIKey = Depends(get_api_key)):
if path.exists(path.join(configGet("data", "locations"), "users", apikey, id, save_date)): # type: ignore
save_path = path.join(configGet("data", "locations"), "users", apikey, id, save_date) # type: ignore
return zipfiles([f"{save_path}{sep}{id}.txt", f"{save_path}{sep}SaveGameInfo", f"{save_path}{sep}index.json"], save_name=f"{id}_{save_date}")
else:
return HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.post("/saves", response_class=UJSONResponse, response_model=StardewSave, description="Upload new save")
async def saves_post(device: str, files: List[UploadFile], apikey: APIKey = Depends(get_api_key)):
error_return = HTTPException(HTTP_406_NOT_ACCEPTABLE, detail="You must provide two files: save file and SaveGameInfo for that save")
if len(files) != 2:
return error_return
if "SaveGameInfo" not in [file.filename for file in files]:
return error_return
save_info = save_data = save_info_file = save_data_file = None
for file in files:
if file.filename == "SaveGameInfo":
save_info_file = await file.read()
save_info = parse(save_info_file.decode("utf-8"))
if "Farmer" not in save_info:
return error_return
else:
save_data_file = await file.read()
save_data = parse(save_data_file.decode("utf-8"))
if "SaveGame" not in save_data:
return error_return
if save_info is None or save_data is None or save_info_file is None or save_data_file is None:
return error_return
now = datetime.now()
save_date = now.strftime("%Y%m%d_%H%M%S")
makedirs(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date), exist_ok=True) # type: ignore
with open(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, "SaveGameInfo"), "wb") as f: # type: ignore
f.write(save_info_file)
with open(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, save_data["SaveGame"]["uniqueIDForThisGame"]+".txt"), "wb") as f: # type: ignore
f.write(save_data_file)
index = {
"id": int(save_data["SaveGame"]["uniqueIDForThisGame"]),
"device": device,
"farmer": save_info["Farmer"]["name"],
"year": int(save_info["Farmer"]["yearForSaveGame"]),
"season": int(save_info["Farmer"]["seasonForSaveGame"]),
"day": int(save_info["Farmer"]["dayOfMonthForSaveGame"]),
"money": int(save_info["Farmer"]["money"]),
"played": int(save_info["Farmer"]["millisecondsPlayed"]),
"save_time": int(save_info["Farmer"]["saveTime"]),
"save_date": save_date,
"uploaded": now.isoformat()
}
jsonSave(index, path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, "index.json")) # type: ignore
return UJSONResponse(index)