SyncAPI/extensions/saves.py

209 lines
8.2 KiB
Python

from datetime import datetime
from io import BytesIO
from urllib.parse import quote_plus
from os import path, remove
from typing import Dict, List, Union
from zipfile import ZipFile
from xmltodict import parse
from models.saves import StardewSave
from modules.app import app, get_api_key, user_by_key
from modules.utils import saveFile
from modules.database import col_devices, col_saves
from fastapi import HTTPException, Depends, UploadFile
from fastapi.responses import UJSONResponse, FileResponse, Response
from fastapi.openapi.models import APIKey
from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE
def zipfiles(filenames, save_name: str) -> Response:
zip_filename = save_name+".svsave"
s = BytesIO()
zf = ZipFile(s, "w")
for fpath in filenames:
# Calculate path for file in zip
fdir, fname = path.split(fpath)
# Add file, at correct path
for entry in (list(col_saves.find({"files.save.uuid": fname})) + list(col_saves.find({"files.saveinfo.uuid": fname}))):
filename = entry["files"]["save"]["name"] if (entry["files"]["save"]["uuid"] == fname) else entry["files"]["saveinfo"]["name"]
zf.write(fpath, filename)
# Must close zip for all contents to be written
zf.close()
# Grab ZIP file from in-memory, make response with correct MIME-type
return Response(
s.getvalue(),
media_type="application/x-zip-compressed",
headers={
'Content-Disposition': f'attachment;filename={quote_plus(zip_filename)}'
}
)
@app.get("/saves", response_class=UJSONResponse, response_model=Dict[str, StardewSave], description="Get all available game saves")
async def saves_get(apikey: APIKey = Depends(get_api_key)):
saves_entries = list(col_saves.find({"user": user_by_key(apikey)}))
if len(saves_entries) == 0:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find any saves.")
output = []
for entry in saves_entries:
out_entry = entry
del out_entry["_id"]
del out_entry["user"]
del out_entry["files"]
output.append(out_entry)
return UJSONResponse(output)
@app.get("/saves/{id}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
async def saves_get_by_id(id: int, device: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)):
saves_entries = list(col_saves.find({"id": id})) if device is None else list(col_saves.find({"id": id, "device": device}))
if len(saves_entries) == 0:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
output = []
for entry in saves_entries:
out_entry = entry
del out_entry["_id"]
del out_entry["files"]
del out_entry["user"]
output.append(out_entry)
return UJSONResponse(output)
@app.get("/saves/{id}/{save_date}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
async def saves_get_by_both_ids(id: int, save_date: int, apikey: APIKey = Depends(get_api_key)):
saves_entry = col_saves.find_one({"user": user_by_key(apikey), "id": id, "date": save_date})
if saves_entry is not None:
del saves_entry["_id"]
del saves_entry["files"]
return UJSONResponse(saves_entry)
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.delete("/saves/{id}", description="Get game saves by name")
async def saves_delete_by_id(id: int, apikey: APIKey = Depends(get_api_key)):
user = user_by_key(apikey)
if col_saves.count_documents({"user": user, "id": id}) > 0:
saves_entries = list(col_saves.find({"user": user, "id": id}))
for entry in saves_entries:
remove(entry["files"]["save"]["path"])
remove(entry["files"]["saveinfo"]["path"])
col_saves.delete_many({"user": user, "id": id})
return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.delete("/saves/{id}/{save_date}", response_class=UJSONResponse, description="Get game saves by name")
async def saves_delete_by_both_ids(id: int, save_date: int, apikey: APIKey = Depends(get_api_key)):
saves_entry = col_saves.find_one_and_delete({"id": id, "date": save_date})
if saves_entry is not None:
remove(saves_entry["files"]["save"]["path"])
remove(saves_entry["files"]["saveinfo"]["path"])
return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.get("/saves/{id}/{save_date}/download", response_class=FileResponse, description="Get game save as .svsave file by its id and save date")
async def saves_download(id: int, save_date: int, apikey: APIKey = Depends(get_api_key)):
saves_entry = col_saves.find_one({"user": user_by_key(apikey), "id": id, "date": save_date})
if saves_entry is not None: # type: ignore
return zipfiles([saves_entry["files"]["save"]["path"], saves_entry["files"]["saveinfo"]["path"]], save_name=f'{saves_entry["data"]["farmer"]}_{saves_entry["id"]}')
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.post("/saves", response_class=UJSONResponse, response_model=StardewSave, description="Upload new save")
async def saves_post(device: str, files: List[UploadFile], apikey: APIKey = Depends(get_api_key)):
user = user_by_key(apikey)
error_return = HTTPException(HTTP_406_NOT_ACCEPTABLE, detail="You must provide two files: save file and SaveGameInfo for that save")
if len(files) != 2:
return error_return
if "SaveGameInfo" not in [file.filename for file in files]:
return error_return
if col_devices.find_one({"user": user, "name": device}) is None:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find device with that name.")
save_info = save_data = save_info_file = save_data_filename = save_data_file = save_info_file_id = save_data_file_id = None
for file in files:
if file.filename == "SaveGameInfo":
save_info_file = await file.read()
save_info_file_id = saveFile(save_info_file)
save_info = parse(save_info_file.decode("utf-8"))
if "Farmer" not in save_info:
return error_return
else:
save_data_filename = file.filename
save_data_file = await file.read()
save_data_file_id = saveFile(save_data_file)
save_data = parse(save_data_file.decode("utf-8"))
if "SaveGame" not in save_data:
return error_return
if save_info is None or save_data is None or save_info_file is None or save_data_filename is None or save_data_file is None or save_info_file_id is None or save_data_file_id is None:
return error_return
save_date = int(datetime.utcnow().timestamp())
index = {
"id": int(save_data["SaveGame"]["uniqueIDForThisGame"]),
"user": user,
"device": device,
"date": save_date,
"data": {
"farmer": save_info["Farmer"]["name"],
"money": int(save_info["Farmer"]["money"]),
"played": int(save_info["Farmer"]["millisecondsPlayed"]),
"save_time": int(save_info["Farmer"]["saveTime"]),
"year": int(save_info["Farmer"]["yearForSaveGame"]),
"season": int(save_info["Farmer"]["seasonForSaveGame"]),
"day": int(save_info["Farmer"]["dayOfMonthForSaveGame"])
},
"files": {
"save": {
"name": save_data_filename,
"uuid": save_data_file_id[0],
"path": save_data_file_id[1]
},
"saveinfo": {
"name": "SaveGameInfo",
"uuid": save_info_file_id[0],
"path": save_info_file_id[1]
}
}
}
col_saves.insert_one(index)
del save_info, save_data
del index["user"]
del index["files"]
del index["_id"]
col_devices.find_one_and_update({"user": user, "name": device}, {"$set": {"last_save": save_date}})
return UJSONResponse(index)