170 lines
7.7 KiB
Python
170 lines
7.7 KiB
Python
from datetime import datetime
|
|
from io import BytesIO
|
|
from os import listdir, makedirs, path, rmdir, sep
|
|
from typing import Dict, List
|
|
from zipfile import ZipFile
|
|
from xmltodict import parse
|
|
from models.saves import StardewSave
|
|
from modules.app import app, get_api_key
|
|
from modules.utils import configGet, jsonLoad, jsonSave
|
|
from fastapi import HTTPException, Depends, UploadFile
|
|
from fastapi.responses import UJSONResponse, FileResponse, Response
|
|
from fastapi.openapi.models import APIKey
|
|
from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE
|
|
|
|
|
|
def zipfiles(filenames, save_name: str) -> Response:
|
|
|
|
zip_filename = save_name+".svsave"
|
|
|
|
s = BytesIO()
|
|
zf = ZipFile(s, "w")
|
|
|
|
for fpath in filenames:
|
|
|
|
# Calculate path for file in zip
|
|
fdir, fname = path.split(fpath)
|
|
|
|
# Add file, at correct path
|
|
zf.write(fpath, fname)
|
|
|
|
# Must close zip for all contents to be written
|
|
zf.close()
|
|
|
|
# Grab ZIP file from in-memory, make response with correct MIME-type
|
|
return Response(
|
|
s.getvalue(),
|
|
media_type="application/x-zip-compressed",
|
|
headers={
|
|
'Content-Disposition': f'attachment;filename={zip_filename}'
|
|
}
|
|
)
|
|
|
|
|
|
@app.get("/saves", response_class=UJSONResponse, response_model=Dict[str, StardewSave], description="Get all available game saves")
|
|
async def saves_get(apikey: APIKey = Depends(get_api_key)):
|
|
save_path = path.join(configGet("data", "locations"), "users", apikey) # type: ignore
|
|
if path.exists(save_path):
|
|
output = {}
|
|
for id in listdir(save_path):
|
|
print("Iterating through", save_path, "on", id)
|
|
for dir in listdir(path.join(save_path, id)):
|
|
print("Iterating through", path.join(save_path, id, dir), "on", dir)
|
|
d = path.join(save_path, id, dir)
|
|
if path.isdir(d):
|
|
if str(id) not in output:
|
|
output[str(id)] = []
|
|
output[str(id)].append(jsonLoad(path.join(configGet("data", "locations"), "users", apikey, id, dir, "index.json"))) # type: ignore
|
|
return UJSONResponse(output)
|
|
else:
|
|
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find any saves.")
|
|
|
|
|
|
@app.get("/saves/{id}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
|
|
async def saves_get_by_id(id: str, apikey: APIKey = Depends(get_api_key)):
|
|
save_path = path.join(configGet("data", "locations"), "users", apikey, id) # type: ignore
|
|
if path.exists(save_path):
|
|
output = []
|
|
for dir in listdir(save_path):
|
|
d = path.join(save_path, dir)
|
|
if path.isdir(d):
|
|
output.append(jsonLoad(path.join(configGet("data", "locations"), "users", apikey, id, dir, "index.json"))) # type: ignore
|
|
return UJSONResponse(output)
|
|
else:
|
|
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
|
|
|
|
|
|
@app.delete("/saves/{id}", description="Get game saves by name")
|
|
async def saves_delete_by_id(id: str, apikey: APIKey = Depends(get_api_key)):
|
|
save_path = path.join(configGet("data", "locations"), "users", apikey, id) # type: ignore
|
|
if path.exists(save_path):
|
|
rmdir(save_path)
|
|
return Response(status_code=HTTP_204_NO_CONTENT)
|
|
else:
|
|
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
|
|
|
|
|
|
@app.delete("/saves/{id}/{save_date}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
|
|
async def saves_delete_by_both_ids(id: str, save_date: str, apikey: APIKey = Depends(get_api_key)):
|
|
save_path = path.join(configGet("data", "locations"), "users", apikey, id, save_date) # type: ignore
|
|
if path.exists(save_path):
|
|
rmdir(save_path)
|
|
return Response(status_code=HTTP_204_NO_CONTENT)
|
|
else:
|
|
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
|
|
|
|
|
|
@app.get("/saves/{id}/{save_date}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
|
|
async def saves_get_by_both_ids(id: str, save_date: str, apikey: APIKey = Depends(get_api_key)):
|
|
save_path = path.join(configGet("data", "locations"), "users", apikey, id, save_date) # type: ignore
|
|
if path.exists(save_path):
|
|
return UJSONResponse(jsonLoad(save_path+sep+"index.json"))
|
|
else:
|
|
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
|
|
|
|
|
|
@app.get("/saves/{id}/{save_date}/download", response_class=FileResponse, description="Get game save as .svsave file by its id and save date")
|
|
async def saves_download(id: str, save_date: str, apikey: APIKey = Depends(get_api_key)):
|
|
if path.exists(path.join(configGet("data", "locations"), "users", apikey, id, save_date)): # type: ignore
|
|
save_path = path.join(configGet("data", "locations"), "users", apikey, id, save_date) # type: ignore
|
|
return zipfiles([f"{save_path}{sep}{id}", f"{save_path}{sep}SaveGameInfo", f"{save_path}{sep}index.json"], save_name=f"{id}_{save_date}")
|
|
else:
|
|
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
|
|
|
|
|
|
@app.post("/saves", response_class=UJSONResponse, response_model=StardewSave, description="Upload new save")
|
|
async def saves_post(device: str, files: List[UploadFile], apikey: APIKey = Depends(get_api_key)):
|
|
|
|
error_return = HTTPException(HTTP_406_NOT_ACCEPTABLE, detail="You must provide two files: save file and SaveGameInfo for that save")
|
|
|
|
if len(files) != 2:
|
|
return error_return
|
|
|
|
if "SaveGameInfo" not in [file.filename for file in files]:
|
|
return error_return
|
|
|
|
save_info = save_data = save_info_file = save_data_file = None
|
|
|
|
for file in files:
|
|
if file.filename == "SaveGameInfo":
|
|
save_info_file = await file.read()
|
|
save_info = parse(save_info_file.decode("utf-8"))
|
|
if "Farmer" not in save_info:
|
|
return error_return
|
|
else:
|
|
save_data_file = await file.read()
|
|
save_data = parse(save_data_file.decode("utf-8"))
|
|
if "SaveGame" not in save_data:
|
|
return error_return
|
|
|
|
if save_info is None or save_data is None or save_info_file is None or save_data_file is None:
|
|
return error_return
|
|
|
|
now = datetime.now()
|
|
save_date = now.strftime("%Y%m%d_%H%M%S")
|
|
|
|
makedirs(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date), exist_ok=True) # type: ignore
|
|
|
|
with open(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, "SaveGameInfo"), "wb") as f: # type: ignore
|
|
f.write(save_info_file)
|
|
|
|
with open(path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, save_data["SaveGame"]["uniqueIDForThisGame"]), "wb") as f: # type: ignore
|
|
f.write(save_data_file)
|
|
|
|
index = {
|
|
"id": int(save_data["SaveGame"]["uniqueIDForThisGame"]),
|
|
"device": device,
|
|
"farmer": save_info["Farmer"]["name"],
|
|
"year": int(save_info["Farmer"]["yearForSaveGame"]),
|
|
"season": int(save_info["Farmer"]["seasonForSaveGame"]),
|
|
"day": int(save_info["Farmer"]["dayOfMonthForSaveGame"]),
|
|
"money": int(save_info["Farmer"]["money"]),
|
|
"played": int(save_info["Farmer"]["millisecondsPlayed"]),
|
|
"save_time": int(save_info["Farmer"]["saveTime"]),
|
|
"save_date": save_date,
|
|
"uploaded": now.isoformat()
|
|
}
|
|
|
|
jsonSave(index, path.join(configGet("data", "locations"), "users", apikey, save_data["SaveGame"]["uniqueIDForThisGame"], save_date, "index.json")) # type: ignore
|
|
|
|
return UJSONResponse(index) |