Divided users and security extensions

This commit is contained in:
Profitroll 2023-01-09 15:01:30 +01:00
parent 31d42f3ce7
commit 19b1ae6158
2 changed files with 77 additions and 71 deletions

View File

@ -1,16 +1,8 @@
from datetime import datetime, timedelta from datetime import timedelta
from modules.database import col_users, col_albums, col_photos, col_emails, col_videos, col_emails, col_tokens
from modules.app import app from modules.app import app
from modules.utils import configGet, logWrite from modules.utils import configGet
from modules.scheduler import scheduler
from modules.mailer import mail_sender
from uuid import uuid1
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from fastapi import Depends, HTTPException from fastapi import Depends, HTTPException
from fastapi.responses import Response, UJSONResponse
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_406_NOT_ACCEPTABLE
from fastapi.security import ( from fastapi.security import (
OAuth2PasswordRequestForm, OAuth2PasswordRequestForm,
) )
@ -18,29 +10,11 @@ from fastapi.security import (
from modules.security import ( from modules.security import (
ACCESS_TOKEN_EXPIRE_DAYS, ACCESS_TOKEN_EXPIRE_DAYS,
Token, Token,
User,
authenticate_user, authenticate_user,
create_access_token, create_access_token
get_current_active_user,
get_password_hash,
get_user,
verify_password
) )
async def send_confirmation(user: str, email: str):
confirmation_code = str(uuid1())
try:
mail_sender.sendmail(
from_addr=configGet("sender", "mailer", "smtp"),
to_addrs=email,
msg=f'From: {configGet("sender", "mailer", "smtp")}\nSubject: Email confirmation\n\n'+configGet("message", "mailer", "messages", "registration_confirmation").format(configGet("external_address")+f"/users/{user}/confirm?code={confirmation_code}")
)
col_emails.insert_one( {"user": user, "email": email, "used": False, "code": confirmation_code} )
except Exception as exp:
logWrite(f"Could not send confirmation email to '{email}' due to: {exp}")
@app.post("/token", response_model=Token) @app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()): async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password) user = authenticate_user(form_data.username, form_data.password)
@ -52,44 +26,3 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
expires_delta=access_token_expires, expires_delta=access_token_expires,
) )
return {"access_token": access_token, "token_type": "bearer"} return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
if configGet("registration_requires_confirmation") is True:
@app.get("/users/{user}/confirm")
@app.patch("/users/{user}/confirm")
async def confirm_users(user: str, code: str):
confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} )
if confirm_record is None:
return HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("email_code_invalid", "messages"))
col_emails.find_one_and_update( {"_id": confirm_record["_id"]}, {"$set": {"used": True}} )
col_users.find_one_and_update( {"user": confirm_record["user"]}, {"$set": {"disabled": False}} )
return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} )
if configGet("registration_enabled") is True:
@app.post("/users")
async def create_users(user: str, email: str, password: str):
if col_users.find_one( {"user": user} ) is not None:
return HTTPException(HTTP_406_NOT_ACCEPTABLE, detail=configGet("user_already_exists", "messages"))
col_users.insert_one( {"user": user, "email": email, "hash": get_password_hash(password), "disabled": configGet("registration_requires_confirmation")} )
if configGet("registration_requires_confirmation") is True:
scheduler.add_job( send_confirmation, trigger="date", run_date=datetime.now()+timedelta(seconds=1), kwargs={"user": user, "email": email} )
return Response(status_code=HTTP_204_NO_CONTENT)
@app.delete("/users/me/")
async def delete_users(password: str, current_user: User = Depends(get_current_active_user)):
user = get_user(current_user.user)
if not user:
return False
if not verify_password(password, user.hash):
return HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("credentials_invalid", "messages"))
col_users.delete_many( {"user": current_user.user} )
col_emails.delete_many( {"user": current_user.user} )
col_photos.delete_many( {"user": current_user.user} )
col_videos.delete_many( {"user": current_user.user} )
col_albums.delete_many( {"user": current_user.user} )

73
extensions/users.py Normal file
View File

@ -0,0 +1,73 @@
from datetime import datetime, timedelta
from modules.database import col_users, col_albums, col_photos, col_emails, col_videos, col_emails
from modules.app import app
from modules.utils import configGet, logWrite
from modules.scheduler import scheduler
from modules.mailer import mail_sender
from uuid import uuid1
from fastapi import Depends, HTTPException
from fastapi.responses import Response, UJSONResponse
from starlette.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_406_NOT_ACCEPTABLE
from modules.security import (
User,
get_current_active_user,
get_password_hash,
get_user,
verify_password
)
async def send_confirmation(user: str, email: str):
confirmation_code = str(uuid1())
try:
mail_sender.sendmail(
from_addr=configGet("sender", "mailer", "smtp"),
to_addrs=email,
msg=f'From: {configGet("sender", "mailer", "smtp")}\nSubject: Email confirmation\n\n'+configGet("message", "mailer", "messages", "registration_confirmation").format(configGet("external_address")+f"/users/{user}/confirm?code={confirmation_code}")
)
col_emails.insert_one( {"user": user, "email": email, "used": False, "code": confirmation_code} )
except Exception as exp:
logWrite(f"Could not send confirmation email to '{email}' due to: {exp}")
@app.get("/users/me/", response_model=User)
async def user_me(current_user: User = Depends(get_current_active_user)):
return current_user
if configGet("registration_requires_confirmation") is True:
@app.get("/users/{user}/confirm")
@app.patch("/users/{user}/confirm")
async def user_confirm(user: str, code: str):
confirm_record = col_emails.find_one( {"user": user, "code": code, "used": False} )
if confirm_record is None:
return HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("email_code_invalid", "messages"))
col_emails.find_one_and_update( {"_id": confirm_record["_id"]}, {"$set": {"used": True}} )
col_users.find_one_and_update( {"user": confirm_record["user"]}, {"$set": {"disabled": False}} )
return UJSONResponse( {"detail": configGet("email_confirmed", "messages")} )
if configGet("registration_enabled") is True:
@app.post("/users")
async def user_create(user: str, email: str, password: str):
if col_users.find_one( {"user": user} ) is not None:
return HTTPException(HTTP_406_NOT_ACCEPTABLE, detail=configGet("user_already_exists", "messages"))
col_users.insert_one( {"user": user, "email": email, "hash": get_password_hash(password), "disabled": configGet("registration_requires_confirmation")} )
if configGet("registration_requires_confirmation") is True:
scheduler.add_job( send_confirmation, trigger="date", run_date=datetime.now()+timedelta(seconds=1), kwargs={"user": user, "email": email} )
return Response(status_code=HTTP_204_NO_CONTENT)
@app.delete("/users/me/")
async def user_delete(password: str, current_user: User = Depends(get_current_active_user)):
user = get_user(current_user.user)
if not user:
return False
if not verify_password(password, user.hash):
return HTTPException(HTTP_400_BAD_REQUEST, detail=configGet("credentials_invalid", "messages"))
col_users.delete_many( {"user": current_user.user} )
col_emails.delete_many( {"user": current_user.user} )
col_photos.delete_many( {"user": current_user.user} )
col_videos.delete_many( {"user": current_user.user} )
col_albums.delete_many( {"user": current_user.user} )