WIP: pathlib support

This commit is contained in:
Profitroll 2023-06-23 08:51:42 +00:00
parent a5cd6a215f
commit 88d8a38444
Signed by: profitroll
GPG Key ID: FA35CAB49DACD3B2
6 changed files with 50 additions and 91 deletions

View File

@ -1,5 +1,6 @@
import re import re
from os import makedirs, path, rename from os import makedirs, rename
from pathlib import Path
from shutil import rmtree from shutil import rmtree
from typing import Union from typing import Union
@ -49,9 +50,7 @@ async def album_create(
if col_albums.find_one({"name": name}) is not None: if col_albums.find_one({"name": name}) is not None:
raise AlbumAlreadyExistsError(name) raise AlbumAlreadyExistsError(name)
makedirs( makedirs(Path(f"data/users/{current_user.user}/albums/{name}"), exist_ok=True)
path.join("data", "users", current_user.user, "albums", name), exist_ok=True
)
uploaded = col_albums.insert_one( uploaded = col_albums.insert_one(
{"user": current_user.user, "name": name, "title": title, "cover": None} {"user": current_user.user, "name": name, "title": title, "cover": None}
@ -123,8 +122,8 @@ async def album_patch(
if 2 > len(name) > 20: if 2 > len(name) > 20:
raise AlbumIncorrectError("name", "must be >2 and <20 characters.") raise AlbumIncorrectError("name", "must be >2 and <20 characters.")
rename( rename(
path.join("data", "users", current_user.user, "albums", album["name"]), Path(f"data/users/{current_user.user}/albums/{album['name']}"),
path.join("data", "users", current_user.user, "albums", name), Path(f"data/users/{current_user.user}/albums/{name}"),
) )
col_photos.update_many( col_photos.update_many(
{"user": current_user.user, "album": album["name"]}, {"user": current_user.user, "album": album["name"]},
@ -186,8 +185,8 @@ async def album_put(
cover = image["_id"].__str__() if image is not None else None # type: ignore cover = image["_id"].__str__() if image is not None else None # type: ignore
rename( rename(
path.join("data", "users", current_user.user, "albums", album["name"]), Path(f"data/users/{current_user.user}/albums/{album['name']}"),
path.join("data", "users", current_user.user, "albums", name), Path(f"data/users/{current_user.user}/albums/{name}"),
) )
col_photos.update_many( col_photos.update_many(
@ -222,6 +221,6 @@ async def album_delete(
col_photos.delete_many({"album": album["name"]}) col_photos.delete_many({"album": album["name"]})
rmtree(path.join("data", "users", current_user.user, "albums", album["name"])) rmtree(Path(f"data/users/{current_user.user}/albums/{album['name']}"))
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)

View File

@ -1,4 +1,4 @@
from os import path from pathlib import Path
import aiofiles import aiofiles
from fastapi.responses import HTMLResponse, Response from fastapi.responses import HTMLResponse, Response
@ -8,27 +8,21 @@ from modules.app import app
@app.get("/pages/matter.css", include_in_schema=False) @app.get("/pages/matter.css", include_in_schema=False)
async def page_matter(): async def page_matter():
async with aiofiles.open( async with aiofiles.open(Path("pages/matter.css"), "r", encoding="utf-8") as f:
path.join("pages", "matter.css"), "r", encoding="utf-8"
) as f:
output = await f.read() output = await f.read()
return Response(content=output) return Response(content=output)
@app.get("/pages/{page}/{file}", include_in_schema=False) @app.get("/pages/{page}/{file}", include_in_schema=False)
async def page_assets(page: str, file: str): async def page_assets(page: str, file: str):
async with aiofiles.open( async with aiofiles.open(Path(f"pages/{page}/{file}"), "r", encoding="utf-8") as f:
path.join("pages", page, file), "r", encoding="utf-8"
) as f:
output = await f.read() output = await f.read()
return Response(content=output) return Response(content=output)
@app.get("/", include_in_schema=False) @app.get("/", include_in_schema=False)
async def page_home(): async def page_home():
async with aiofiles.open( async with aiofiles.open(Path("pages/home/index.html"), "r", encoding="utf-8") as f:
path.join("pages", "home", "index.html"), "r", encoding="utf-8"
) as f:
output = await f.read() output = await f.read()
return HTMLResponse(content=output) return HTMLResponse(content=output)
@ -36,7 +30,7 @@ async def page_home():
@app.get("/register", include_in_schema=False) @app.get("/register", include_in_schema=False)
async def page_register(): async def page_register():
async with aiofiles.open( async with aiofiles.open(
path.join("pages", "register", "index.html"), "r", encoding="utf-8" Path("pages/register/index.html"), "r", encoding="utf-8"
) as f: ) as f:
output = await f.read() output = await f.read()
return HTMLResponse(content=output) return HTMLResponse(content=output)

View File

@ -1,6 +1,7 @@
import re import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from os import makedirs, path, remove, system from os import makedirs, path, remove, system
from pathlib import Path
from secrets import token_urlsafe from secrets import token_urlsafe
from shutil import move from shutil import move
from threading import Thread from threading import Thread
@ -109,15 +110,11 @@ async def photo_upload(
if col_albums.find_one({"user": current_user.user, "name": album}) is None: if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album) raise AlbumNameNotFoundError(album)
makedirs( makedirs(Path(f"data/users/{current_user.user}/albums/{album}"), exist_ok=True)
path.join("data", "users", current_user.user, "albums", album), exist_ok=True
)
filename = file.filename filename = file.filename
if path.exists( if Path(f"data/users/{current_user.user}/albums/{album}/{file.filename}").exists():
path.join("data", "users", current_user.user, "albums", album, file.filename)
):
base_name = file.filename.split(".")[:-1] base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1] extension = file.filename.split(".")[-1]
filename = ( filename = (
@ -125,12 +122,12 @@ async def photo_upload(
) )
async with aiofiles.open( async with aiofiles.open(
path.join("data", "users", current_user.user, "albums", album, filename), "wb" Path(f"data/users/{current_user.user}/albums/{album}/{filename}"), "wb"
) as f: ) as f:
await f.write(await file.read()) await f.write(await file.read())
file_hash = await get_phash( file_hash = await get_phash(
path.join("data", "users", current_user.user, "albums", album, filename) Path(f"data/users/{current_user.user}/albums/{album}/{filename}")
) )
duplicates = await get_duplicates(file_hash, album) duplicates = await get_duplicates(file_hash, album)
@ -168,7 +165,7 @@ async def photo_upload(
try: try:
coords = extract_location( coords = extract_location(
path.join("data", "users", current_user.user, "albums", album, filename) Path(f"data/users/{current_user.user}/albums/{album}/{filename}")
) )
except (UnpackError, ValueError): except (UnpackError, ValueError):
coords = {"lng": 0.0, "lat": 0.0, "alt": 0.0} coords = {"lng": 0.0, "lat": 0.0, "alt": 0.0}
@ -193,9 +190,7 @@ async def photo_upload(
compress_image, compress_image,
trigger="date", trigger="date",
run_date=datetime.now() + timedelta(seconds=1), run_date=datetime.now() + timedelta(seconds=1),
args=[ args=[Path(f"data/users/{current_user.user}/albums/{album}/{filename}")],
path.join("data", "users", current_user.user, "albums", album, filename)
],
) )
return UJSONResponse( return UJSONResponse(
@ -254,8 +249,8 @@ if configGet("media_token_access") is True:
except InvalidId: except InvalidId:
raise PhotoNotFoundError(id) raise PhotoNotFoundError(id)
image_path = path.join( image_path = Path(
"data", "users", user.user, "albums", image["album"], image["filename"] f"data/users/{user.user}/albums/{image['album']}/{image['filename']}"
) )
mime = Magic(mime=True).from_file(image_path) mime = Magic(mime=True).from_file(image_path)
@ -299,8 +294,8 @@ async def photo_get(
except InvalidId: except InvalidId:
raise PhotoNotFoundError(id) raise PhotoNotFoundError(id)
image_path = path.join( image_path = Path(
"data", "users", current_user.user, "albums", image["album"], image["filename"] f"data/users/{current_user.user}/albums/{image['album']}/{image['filename']}"
) )
mime = Magic(mime=True).from_file(image_path) mime = Magic(mime=True).from_file(image_path)
@ -335,11 +330,9 @@ async def photo_move(
if col_albums.find_one({"user": current_user.user, "name": album}) is None: if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album) raise AlbumNameNotFoundError(album)
if path.exists( if Path(
path.join( f"data/users/{current_user.user}/albums/{album}/{image['filename']}"
"data", "users", current_user.user, "albums", album, image["filename"] ).exists():
)
):
base_name = image["filename"].split(".")[:-1] base_name = image["filename"].split(".")[:-1]
extension = image["filename"].split(".")[-1] extension = image["filename"].split(".")[-1]
filename = ( filename = (
@ -360,15 +353,10 @@ async def photo_move(
) )
move( move(
path.join( Path(
"data", f"data/users/{current_user.user}/albums/{image['album']}/{image['filename']}"
"users",
current_user.user,
"albums",
image["album"],
image["filename"],
), ),
path.join("data", "users", current_user.user, "albums", album, filename), Path(f"data/users/{current_user.user}/albums/{album}/{filename}"),
) )
return UJSONResponse( return UJSONResponse(
@ -441,13 +429,8 @@ async def photo_delete(
col_albums.update_one({"name": image["album"]}, {"$set": {"cover": None}}) col_albums.update_one({"name": image["album"]}, {"$set": {"cover": None}})
remove( remove(
path.join( Path(
"data", f"data/users/{current_user.user}/albums/{image['album']}/{image['filename']}"
"users",
current_user.user,
"albums",
image["album"],
image["filename"],
) )
) )

View File

@ -5,6 +5,8 @@ from secrets import token_urlsafe
from shutil import move from shutil import move
from typing import Union from typing import Union
from pathlib import Path
import aiofiles import aiofiles
from bson.errors import InvalidId from bson.errors import InvalidId
from bson.objectid import ObjectId from bson.objectid import ObjectId
@ -45,15 +47,11 @@ async def video_upload(
if col_albums.find_one({"user": current_user.user, "name": album}) is None: if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album) raise AlbumNameNotFoundError(album)
makedirs( makedirs(Path(f"data/users/{current_user.user}/albums/{album}"), exist_ok=True)
path.join("data", "users", current_user.user, "albums", album), exist_ok=True
)
filename = file.filename filename = file.filename
if path.exists( if Path(f"data/users/{current_user.user}/albums/{album}/{file.filename}").exists():
path.join("data", "users", current_user.user, "albums", album, file.filename)
):
base_name = file.filename.split(".")[:-1] base_name = file.filename.split(".")[:-1]
extension = file.filename.split(".")[-1] extension = file.filename.split(".")[-1]
filename = ( filename = (
@ -61,7 +59,7 @@ async def video_upload(
) )
async with aiofiles.open( async with aiofiles.open(
path.join("data", "users", current_user.user, "albums", album, filename), "wb" Path(f"data/users/{current_user.user}/albums/{album}/{filename}"), "wb"
) as f: ) as f:
await f.write(await file.read()) await f.write(await file.read())
@ -125,8 +123,8 @@ async def video_get(
except InvalidId: except InvalidId:
raise VideoNotFoundError(id) raise VideoNotFoundError(id)
video_path = path.join( video_path = Path(
"data", "users", current_user.user, "albums", video["album"], video["filename"] f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}"
) )
mime = Magic(mime=True).from_file(video_path) mime = Magic(mime=True).from_file(video_path)
@ -161,11 +159,9 @@ async def video_move(
if col_albums.find_one({"user": current_user.user, "name": album}) is None: if col_albums.find_one({"user": current_user.user, "name": album}) is None:
raise AlbumNameNotFoundError(album) raise AlbumNameNotFoundError(album)
if path.exists( if Path(
path.join( f"data"/users/{current_user.user}/albums/{album}/{video['filename']}"
"data", "users", current_user.user, "albums", album, video["filename"] ).exists():
)
):
base_name = video["filename"].split(".")[:-1] base_name = video["filename"].split(".")[:-1]
extension = video["filename"].split(".")[-1] extension = video["filename"].split(".")[-1]
filename = ( filename = (
@ -186,15 +182,8 @@ async def video_move(
) )
move( move(
path.join( Path(f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}"),
"data", Path(f"data/users/{current_user.user}/albums/{album}/{filename}"),
"users",
current_user.user,
"albums",
video["album"],
video["filename"],
),
path.join("data", "users", current_user.user, "albums", album, filename),
) )
return UJSONResponse( return UJSONResponse(
@ -264,14 +253,7 @@ async def video_delete(
album = col_albums.find_one({"name": video["album"]}) album = col_albums.find_one({"name": video["album"]})
remove( remove(
path.join( Path(f"data/users/{current_user.user}/albums/{video['album']}/{video['filename']}")
"data",
"users",
current_user.user,
"albums",
video["album"],
video["filename"],
)
) )
return Response(status_code=HTTP_204_NO_CONTENT) return Response(status_code=HTTP_204_NO_CONTENT)

View File

@ -1,5 +1,6 @@
from importlib.util import module_from_spec, spec_from_file_location from importlib.util import module_from_spec, spec_from_file_location
from os import getcwd, path, walk from os import getcwd, path, walk
from pathlib import Path
# ================================================================================= # =================================================================================
@ -12,7 +13,7 @@ def get_py_files(src):
for root, dirs, files in walk(src): for root, dirs, files in walk(src):
for file in files: for file in files:
if file.endswith(".py"): if file.endswith(".py"):
py_files.append(path.join(cwd, root, file)) py_files.append(Path(f"{cwd}/{root}/{file}"))
return py_files return py_files

View File

@ -1,13 +1,13 @@
from os import makedirs, path from os import makedirs
from pathlib import Path
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from modules.app import app from modules.app import app
from modules.extensions_loader import dynamic_import_from_src from modules.extensions_loader import dynamic_import_from_src
from modules.scheduler import scheduler from modules.scheduler import scheduler
from modules.utils import *
makedirs(path.join("data", "users"), exist_ok=True) makedirs(Path("data/users"), exist_ok=True)
@app.get("/favicon.ico", response_class=FileResponse, include_in_schema=False) @app.get("/favicon.ico", response_class=FileResponse, include_in_schema=False)