Media access tokens are now an option

This commit is contained in:
Profitroll 2023-02-18 11:07:46 +01:00
parent c272342b4b
commit 09ec0f4620
2 changed files with 52 additions and 41 deletions

View File

@ -10,6 +10,8 @@
"email_confirmed": "Email confirmed. You can now log in."
},
"external_address": "localhost",
"media_token_access": false,
"media_token_valid_hours": 12,
"registration_enabled": true,
"registration_requires_confirmation": true,
"mailer": {

View File

@ -29,7 +29,7 @@ from fastapi.responses import UJSONResponse, Response
from fastapi.exceptions import HTTPException
from starlette.status import HTTP_204_NO_CONTENT, HTTP_401_UNAUTHORIZED, HTTP_409_CONFLICT
from modules.utils import logWrite
from modules.utils import configGet, logWrite
async def compress_image(image_path: str):
@ -94,12 +94,15 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F
duplicates = await get_duplicates(file_hash, album)
if len(duplicates) > 0 and ignore_duplicates is False:
duplicates_ids = []
for entry in duplicates:
duplicates_ids.append(entry["id"])
access_token = create_access_token(data={"sub": current_user.user, "scopes": ["me", "photos.read"], "allowed": duplicates_ids}, expires_delta=timedelta(hours=1))
access_token_short = uuid4().hex[:12].lower()
col_tokens.insert_one({"short": access_token_short, "access_token": access_token, "photos": duplicates_ids})
if configGet("media_token_access") is True:
duplicates_ids = []
for entry in duplicates:
duplicates_ids.append(entry["id"])
access_token = create_access_token(data={"sub": current_user.user, "scopes": ["me", "photos.read"], "allowed": duplicates_ids}, expires_delta=timedelta(hours=configGet("media_token_valid_hours")))
access_token_short = uuid4().hex[:12].lower()
col_tokens.insert_one({"short": access_token_short, "access_token": access_token, "photos": duplicates_ids})
else:
access_token_short = None
return UJSONResponse(
{
"detail": "Image duplicates found. Pass 'ignore_duplicates=true' to ignore.",
@ -149,51 +152,57 @@ async def photo_upload(file: UploadFile, album: str, ignore_duplicates: bool = F
}
)
photo_get_token_responses = {
401: AccessTokenInvalidError().openapi,
404: PhotoNotFoundError("id").openapi
}
@app.get("/token/photo/{token}", description="Get a photo by its duplicate token", responses=photo_get_token_responses)
async def photo_get_token(token: str, id: int):
# Access to photos y token generated for example by
# upload method when duplicates are found. Is disabled
# by default and should remain so if not really needed.
if configGet("media_token_access") is True:
db_entry = col_tokens.find_one({"short": token})
photo_get_token_responses = {
401: AccessTokenInvalidError().openapi,
404: PhotoNotFoundError("id").openapi
}
if db_entry is None:
raise AccessTokenInvalidError()
@app.get("/token/photo/{token}", description="Get a photo by its duplicate token", responses=photo_get_token_responses)
async def photo_get_token(token: str, id: int):
token = db_entry["access_token"]
id = db_entry["photos"][id]
db_entry = col_tokens.find_one({"short": token})
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user: str = payload.get("sub")
if user is None:
if db_entry is None:
raise AccessTokenInvalidError()
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, user=user)
except (JWTError, ValidationError) as exp:
print(exp, flush=True)
raise AccessTokenInvalidError()
user = get_user(user=token_data.user)
token = db_entry["access_token"]
id = db_entry["photos"][id]
if id not in payload.get("allowed", []):
raise AccessTokenInvalidError()
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user: str = payload.get("sub")
if user is None:
raise AccessTokenInvalidError()
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, user=user)
except (JWTError, ValidationError) as exp:
print(exp, flush=True)
raise AccessTokenInvalidError()
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
user = get_user(user=token_data.user)
image_path = path.join("data", "users", user.user, "albums", image["album"], image["filename"])
if id not in payload.get("allowed", []):
raise AccessTokenInvalidError()
mime = Magic(mime=True).from_file(image_path)
try:
image = col_photos.find_one( {"_id": ObjectId(id)} )
if image is None:
raise InvalidId(id)
except InvalidId:
raise PhotoNotFoundError(id)
with open(image_path, "rb") as f: image_file = f.read()
image_path = path.join("data", "users", user.user, "albums", image["album"], image["filename"])
return Response(image_file, media_type=mime)
mime = Magic(mime=True).from_file(image_path)
with open(image_path, "rb") as f: image_file = f.read()
return Response(image_file, media_type=mime)
photo_get_responses = {
404: PhotoNotFoundError("id").openapi