SyncAPI/extensions/saves.py

219 lines
8.4 KiB
Python

from datetime import datetime
from urllib.parse import quote_plus
from os import remove
from typing import Dict, List, Literal, Union
from xmltodict import parse
from pymongo import DESCENDING
from models.saves import StardewSave, StardewSaveBrief
from modules.app import app, get_api_key, user_by_key
from modules.database import col_devices, col_saves
from fastapi import HTTPException, Depends, UploadFile
from fastapi.responses import UJSONResponse, FileResponse, Response
from fastapi.openapi.models import APIKey
from starlette.status import HTTP_204_NO_CONTENT, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_406_NOT_ACCEPTABLE
from modules.utils import configGet, zip_saves
@app.get("/saves", response_class=UJSONResponse, response_model=Union[List[Dict[str, StardewSave]], List[StardewSaveBrief]], description="Get all available game saves")
async def saves_get(device: Union[str, None] = None, version: Union[str, None] = None, only_ids: bool = False, apikey: APIKey = Depends(get_api_key)):
user = user_by_key(apikey)
query = {"user": user}
if device is not None:
query["device"] = device
if version is not None:
query["data.game_version"] = version
saves_entries = list(col_saves.find(query).sort("date", DESCENDING))
if len(saves_entries) == 0:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find any saves.")
output = []
added = []
for entry in saves_entries:
out_entry = entry
del out_entry["_id"]
del out_entry["user"]
del out_entry["file"]
if only_ids is True:
if entry["id"] in added:
continue
else:
added.append(entry["id"])
output.append(out_entry)
return UJSONResponse(output)
@app.get("/saves/{id}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by name")
async def saves_get_by_id(id: int, device: Union[str, None] = None, version: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)):
query = {"id": id}
if device is not None:
query["device"] = device
if version is not None:
query["data.game_version"] = version
saves_entries = list(col_saves.find(query).sort("date", DESCENDING))
if len(saves_entries) == 0:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
output = []
for entry in saves_entries:
out_entry = entry
del out_entry["_id"]
del out_entry["file"]
del out_entry["user"]
output.append(out_entry)
return UJSONResponse(output)
@app.get("/saves/{id}/{save_date}", response_class=UJSONResponse, response_model=List[StardewSave], description="Get game saves by id and upload date")
async def saves_get_by_both_ids(id: int, save_date: Union[int, Literal["latest"]], device: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)):
query = {"user": user_by_key(apikey), "id": id, "date": save_date}
if device is not None:
query["device"] = device
saves_entry = col_saves.find_one(query)
if saves_entry is not None:
del saves_entry["_id"]
del saves_entry["file"]
return UJSONResponse(saves_entry)
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.delete("/saves/{id}", description="Delete game saves by id")
async def saves_delete_by_id(id: int, apikey: APIKey = Depends(get_api_key)):
user = user_by_key(apikey)
if col_saves.count_documents({"user": user, "id": id}) > 0:
saves_entries = list(col_saves.find({"user": user, "id": id}))
for entry in saves_entries:
remove(entry["file"]["path"])
col_saves.delete_many({"user": user, "id": id})
return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.delete("/saves/{id}/{save_date}", response_class=UJSONResponse, description="Delete game saves by id and upload date")
async def saves_delete_by_both_ids(id: int, save_date: int, apikey: APIKey = Depends(get_api_key)):
saves_entry = col_saves.find_one_and_delete({"id": id, "date": save_date})
if saves_entry is not None:
remove(saves_entry["file"]["path"])
return Response(status_code=HTTP_204_NO_CONTENT)
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.get("/saves/{id}/{save_date}/download", response_class=FileResponse, description="Get game save as .svsave file by its id and upload date")
async def saves_download(id: int, save_date: int, device: Union[str, None] = None, apikey: APIKey = Depends(get_api_key)):
saves_entry = col_saves.find_one({"user": user_by_key(apikey), "id": id, "date": save_date}) if device is None else col_saves.find_one({"user": user_by_key(apikey), "id": id, "device": device, "date": save_date})
if saves_entry is not None: # type: ignore
with open(saves_entry["file"]["path"], "rb") as file:
response = Response(
file.read(),
media_type="application/x-zip-compressed",
headers={
'Content-Disposition': f'attachment;filename={quote_plus(saves_entry["file"]["name"])}.svsave'
}
)
return response
else:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find save with such id.")
@app.post("/saves", response_class=UJSONResponse, response_model=StardewSave, description="Upload new save")
async def saves_post(device: str, files: List[UploadFile], apikey: APIKey = Depends(get_api_key)):
user = user_by_key(apikey)
error_return = HTTPException(HTTP_406_NOT_ACCEPTABLE, detail="You must provide two files: save file and SaveGameInfo for that save")
if (configGet("saves", "limits") != -1) and (col_saves.count_documents({"user": user}) >= configGet("saves", "limits")):
return UJSONResponse(
{
"detail": f'Too many save files. This instance allows to store only {configGet("saves", "limits")} saves per user',
"limit": configGet("saves", "limits")
},
status_code=HTTP_403_FORBIDDEN
)
if len(files) != 2:
return error_return
if "SaveGameInfo" not in [file.filename for file in files]:
return error_return
if col_devices.find_one({"user": user, "name": device}) is None:
raise HTTPException(HTTP_404_NOT_FOUND, detail="Could not find device with that name.")
save_info = save_data = save_info_file = save_data_filename = save_data_file = None
for file in files:
if file.filename == "SaveGameInfo":
save_info_file = await file.read()
save_info = parse(save_info_file.decode("utf-8"))
if "Farmer" not in save_info:
return error_return
else:
save_data_filename = file.filename
save_data_file = await file.read()
save_data = parse(save_data_file.decode("utf-8"))
if "SaveGame" not in save_data:
return error_return
if save_info is None or save_data is None or save_info_file is None or save_data_filename is None or save_data_file is None:
return error_return
zipped = zip_saves(save_data_filename, save_data_file, save_info_file)
save_date = int(datetime.utcnow().timestamp())
index = {
"id": int(save_data["SaveGame"]["uniqueIDForThisGame"]),
"user": user,
"device": device,
"date": save_date,
"data": {
"farmer": save_info["Farmer"]["name"],
"money": int(save_info["Farmer"]["money"]),
"played": int(save_info["Farmer"]["millisecondsPlayed"]),
"save_time": int(save_info["Farmer"]["saveTime"]),
"year": int(save_info["Farmer"]["yearForSaveGame"]),
"season": int(save_info["Farmer"]["seasonForSaveGame"]),
"day": int(save_info["Farmer"]["dayOfMonthForSaveGame"]),
"game_version": save_info["Farmer"]["gameVersion"]
},
"file": {
"name": save_data_filename,
"uuid": zipped[0],
"path": zipped[1]
}
}
col_saves.insert_one(index)
del save_info, save_data
del index["user"]
del index["file"]
del index["_id"]
col_devices.find_one_and_update({"user": user, "name": device}, {"$set": {"last_save": save_date}})
return UJSONResponse(index)