PhotosAPI/extensions/security.py
2023-01-07 21:48:43 +01:00

95 lines
4.3 KiB
Python

from datetime import datetime, timedelta
from modules.database import col_users, col_albums, col_photos, col_emails, col_videos, col_emails, col_tokens
from modules.app import app
from modules.utils import configGet, logWrite
from modules.scheduler import scheduler
from modules.mailer import mail_sender
from uuid import uuid1
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from fastapi import Depends, HTTPException
from fastapi.responses import Response, UJSONResponse
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_406_NOT_ACCEPTABLE
from fastapi.security import (
OAuth2PasswordRequestForm,
)
from modules.security import (
ACCESS_TOKEN_EXPIRE_DAYS,
Token,
User,
authenticate_user,
create_access_token,
get_current_active_user,
get_password_hash,
get_user,
verify_password
)
async def send_confirmation(user: str, email: str):
confirmation_code = str(uuid1())
try:
mail_sender.sendmail(
from_addr=configGet("sender", "mailer", "smtp"),
to_addrs=email,
msg=f'From: {configGet("sender", "mailer", "smtp")}\nSubject: Email confirmation\n\n'+configGet("message", "mailer", "messages", "registration_confirmation").format(configGet("external_address")+f"/users/{user}/confirm?code={confirmation_code}")
)
col_emails.insert_one( {"user": user, "email": email, "used": False, "code": confirmation_code} )
except Exception as exp:
logWrite(f"Could not send confirmation email to '{email}' due to: {exp}")
@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail=configGet("credentials_invalid", "messages"))
access_token_expires = timedelta(days=ACCESS_TOKEN_EXPIRE_DAYS)
access_token = create_access_token(
data={"sub": user.user, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
if configGet("registration_requires_confirmation") is True:
@app.get("/users/{user}/confirm")
@app.patch("/users/{user}/confirm")
async def confirm_users(user: str, code: str):
confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} )
if confirm_record is None:
return HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("email_code_invalid", "messages"))
col_emails.find_one_and_update( {"_id": confirm_record["_id"]}, {"$set": {"used": True}} )
col_users.find_one_and_update( {"user": confirm_record["user"]}, {"$set": {"disabled": False}} )
return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} )
if configGet("registration_enabled") is True:
@app.post("/users")
async def create_users(user: str, email: str, password: str):
if col_users.find_one( {"user": user} ) is not None:
return HTTPException(HTTP_406_NOT_ACCEPTABLE, detail=configGet("user_already_exists", "messages"))
col_users.insert_one( {"user": user, "email": email, "hash": get_password_hash(password), "disabled": configGet("registration_requires_confirmation")} )
if configGet("registration_requires_confirmation") is True:
scheduler.add_job( send_confirmation, trigger="date", run_date=datetime.now()+timedelta(seconds=1), kwargs={"user": user, "email": email} )
return Response(status_code=HTTP_204_NO_CONTENT)
@app.delete("/users/me/")
async def delete_users(password: str, current_user: User = Depends(get_current_active_user)):
user = get_user(current_user.user)
if not user:
return False
if not verify_password(password, user.hash):
return HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("credentials_invalid", "messages"))
col_users.delete_many( {"user": current_user.user} )
col_emails.delete_many( {"user": current_user.user} )
col_photos.delete_many( {"user": current_user.user} )
col_videos.delete_many( {"user": current_user.user} )
col_albums.delete_many( {"user": current_user.user} )