2 Commits

Author SHA1 Message Date
4b43e76822 logWrite replaced with logging module 2023-06-23 11:25:27 +02:00
23467a88ef pathlib support 2023-06-23 11:17:02 +02:00
7 changed files with 59 additions and 36 deletions

View File

@@ -1,3 +1,4 @@
import logging
import re import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system from os import makedirs, path, remove, system
@@ -43,14 +44,18 @@ from modules.security import (
get_current_active_user, get_current_active_user,
get_user, get_user,
) )
from modules.utils import configGet, logWrite from modules.utils import configGet
logger = logging.getLogger(__name__)
async def compress_image(image_path: str): async def compress_image(image_path: str):
image_type = Magic(mime=True).from_file(image_path) image_type = Magic(mime=True).from_file(image_path)
if image_type not in ["image/jpeg", "image/png"]: if image_type not in ["image/jpeg", "image/png"]:
logWrite(f"Not compressing {image_path} because its mime is '{image_type}'") logger.info(
"Not compressing %s because its mime is '%s'", image_path, image_type
)
return return
size_before = path.getsize(image_path) / 1024 size_before = path.getsize(image_path) / 1024
@@ -66,12 +71,15 @@ async def compress_image(image_path: str):
return return
task.start() task.start()
logWrite(f"Compressing '{path.split(image_path)[-1]}'...") logger.info("Compressing '%s'...", Path(image_path).name)
task.join() task.join()
size_after = path.getsize(image_path) / 1024 size_after = path.getsize(image_path) / 1024
logWrite( logger.info(
f"Compressed '{path.split(image_path)[-1]}' from {size_before} Kb to {size_after} Kb" "Compressed '%s' from %s Kb to %s Kb",
Path(image_path).name,
size_before,
size_after,
) )

View File

@@ -1,3 +1,4 @@
import logging
from datetime import datetime, timedelta from datetime import datetime, timedelta
from uuid import uuid1 from uuid import uuid1
@@ -21,7 +22,9 @@ from modules.security import (
get_user, get_user,
verify_password, verify_password,
) )
from modules.utils import configGet, logWrite from modules.utils import configGet
logger = logging.getLogger(__name__)
async def send_confirmation(user: str, email: str): async def send_confirmation(user: str, email: str):
@@ -41,9 +44,11 @@ async def send_confirmation(user: str, email: str):
col_emails.insert_one( col_emails.insert_one(
{"user": user, "email": email, "used": False, "code": confirmation_code} {"user": user, "email": email, "used": False, "code": confirmation_code}
) )
logWrite(f"Sent confirmation email to '{email}' with code {confirmation_code}") logger.info(
"Sent confirmation email to '%s' with code %s", email, confirmation_code
)
except Exception as exp: except Exception as exp:
logWrite(f"Could not send confirmation email to '{email}' due to: {exp}") logger.error("Could not send confirmation email to '%s' due to: %s", email, exp)
@app.get("/users/me/", response_model=User) @app.get("/users/me/", response_model=User)

View File

@@ -1,12 +1,11 @@
import re import re
from datetime import datetime, timezone from datetime import datetime, timezone
from os import makedirs, path, remove from os import makedirs, remove
from pathlib import Path
from secrets import token_urlsafe from secrets import token_urlsafe
from shutil import move from shutil import move
from typing import Union from typing import Union
from pathlib import Path
import aiofiles import aiofiles
from bson.errors import InvalidId from bson.errors import InvalidId
from bson.objectid import ObjectId from bson.objectid import ObjectId

View File

@@ -37,7 +37,7 @@ def dynamic_import(module_name, py_path):
def dynamic_import_from_src(src, star_import=False): def dynamic_import_from_src(src, star_import=False):
my_py_files = get_py_files(src) my_py_files = get_py_files(src)
for py_file in my_py_files: for py_file in my_py_files:
module_name = path.split(py_file)[-1][:-3] module_name = Path(py_file).stem
print(f"Importing {module_name} extension...", flush=True) print(f"Importing {module_name} extension...", flush=True)
imported_module = dynamic_import(module_name, py_file) imported_module = dynamic_import(module_name, py_file)
if imported_module != None: if imported_module != None:

View File

@@ -1,8 +1,11 @@
import logging
from smtplib import SMTP, SMTP_SSL from smtplib import SMTP, SMTP_SSL
from ssl import create_default_context from ssl import create_default_context
from traceback import print_exc from traceback import print_exc
from modules.utils import configGet, logWrite from modules.utils import configGet
logger = logging.getLogger(__name__)
try: try:
if configGet("use_ssl", "mailer", "smtp") is True: if configGet("use_ssl", "mailer", "smtp") is True:
@@ -10,7 +13,7 @@ try:
configGet("host", "mailer", "smtp"), configGet("host", "mailer", "smtp"),
configGet("port", "mailer", "smtp"), configGet("port", "mailer", "smtp"),
) )
logWrite(f"Initialized SMTP SSL connection") logger.info("Initialized SMTP SSL connection")
elif configGet("use_tls", "mailer", "smtp") is True: elif configGet("use_tls", "mailer", "smtp") is True:
mail_sender = SMTP( mail_sender = SMTP(
configGet("host", "mailer", "smtp"), configGet("host", "mailer", "smtp"),
@@ -18,21 +21,21 @@ try:
) )
mail_sender.starttls(context=create_default_context()) mail_sender.starttls(context=create_default_context())
mail_sender.ehlo() mail_sender.ehlo()
logWrite(f"Initialized SMTP TLS connection") logger.info("Initialized SMTP TLS connection")
else: else:
mail_sender = SMTP( mail_sender = SMTP(
configGet("host", "mailer", "smtp"), configGet("port", "mailer", "smtp") configGet("host", "mailer", "smtp"), configGet("port", "mailer", "smtp")
) )
mail_sender.ehlo() mail_sender.ehlo()
logWrite(f"Initialized SMTP connection") logger.info("Initialized SMTP connection")
except Exception as exp: except Exception as exp:
logWrite(f"Could not initialize SMTP connection to: {exp}") logger.error("Could not initialize SMTP connection to: %s", exp)
print_exc() print_exc()
try: try:
mail_sender.login( mail_sender.login(
configGet("login", "mailer", "smtp"), configGet("password", "mailer", "smtp") configGet("login", "mailer", "smtp"), configGet("password", "mailer", "smtp")
) )
logWrite(f"Successfully initialized mailer") logger.info("Successfully initialized mailer")
except Exception as exp: except Exception as exp:
logWrite(f"Could not login into provided SMTP account due to: {exp}") logger.error("Could not login into provided SMTP account due to: %s", exp)

View File

@@ -1,21 +1,18 @@
from traceback import print_exc import logging
from pathlib import Path
from traceback import format_exc
from typing import Any, Union from typing import Any, Union
from ujson import JSONDecodeError, dumps, loads from ujson import JSONDecodeError, dumps, loads
logger = logging.getLogger(__name__)
# Print to stdout and then to log
def logWrite(message: str, debug: bool = False) -> None:
# save to log file and rotation is to be done
# logAppend(f'{message}', debug=debug)
print(f"{message}", flush=True)
def jsonLoad(filepath: str) -> Any: def jsonLoad(filepath: Union[str, Path]) -> Any:
"""Load json file """Load json file
### Args: ### Args:
* filepath (`str`): Path to input file * filepath (`Union[str, Path]`): Path to input file
### Returns: ### Returns:
* `Any`: Some json deserializable * `Any`: Some json deserializable
@@ -24,32 +21,36 @@ def jsonLoad(filepath: str) -> Any:
try: try:
output = loads(file.read()) output = loads(file.read())
except JSONDecodeError: except JSONDecodeError:
logWrite( logger.error(
f"Could not load json file {filepath}: file seems to be incorrect!\n{print_exc()}" "Could not load json file %s: file seems to be incorrect!\n%s",
filepath,
format_exc(),
) )
raise raise
except FileNotFoundError: except FileNotFoundError:
logWrite( logger.error(
f"Could not load json file {filepath}: file does not seem to exist!\n{print_exc()}" "Could not load json file %s: file does not seem to exist!\n%s",
filepath,
format_exc(),
) )
raise raise
file.close() file.close()
return output return output
def jsonSave(contents: Union[list, dict], filepath: str) -> None: def jsonSave(contents: Union[list, dict], filepath: Union[str, Path]) -> None:
"""Save contents into json file """Save contents into json file
### Args: ### Args:
* contents (`Union[list, dict]`): Some json serializable * contents (`Union[list, dict]`): Some json serializable
* filepath (`str`): Path to output file * filepath (`Union[str, Path]`): Path to output file
""" """
try: try:
with open(filepath, "w", encoding="utf8") as file: with open(filepath, "w", encoding="utf8") as file:
file.write(dumps(contents, ensure_ascii=False, indent=4)) file.write(dumps(contents, ensure_ascii=False, indent=4))
file.close() file.close()
except Exception as exp: except Exception as exp:
logWrite(f"Could not save json file {filepath}: {exp}\n{print_exc()}") logger.error("Could not save json file %s: %s\n%s", filepath, exp, format_exc())
return return
@@ -63,7 +64,7 @@ def configGet(key: str, *args: str) -> Any:
### Returns: ### Returns:
* `Any`: Value of provided key * `Any`: Value of provided key
""" """
this_dict = jsonLoad("config.json") this_dict = jsonLoad(Path("config.json"))
this_key = this_dict this_key = this_dict
for dict_key in args: for dict_key in args:
this_key = this_key[dict_key] this_key = this_key[dict_key]

View File

@@ -1,3 +1,4 @@
import logging
from os import makedirs from os import makedirs
from pathlib import Path from pathlib import Path
@@ -9,6 +10,12 @@ from modules.scheduler import scheduler
makedirs(Path("data/users"), exist_ok=True) makedirs(Path("data/users"), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]",
)
@app.get("/favicon.ico", response_class=FileResponse, include_in_schema=False) @app.get("/favicon.ico", response_class=FileResponse, include_in_schema=False)
async def favicon(): async def favicon():