31 Commits

Author SHA1 Message Date
4cdb8fbd26 WIP: Implemented /consent give all, /consent withdraw all and /consent review (#51) 2025-07-28 01:12:35 +02:00
6f38ecb33d Added Makefile and development dependencies 2025-07-28 00:37:31 +02:00
cec35f10d7 Merge pull request 'Added basic analytics collector (#62)' (#63) from feature/profitroll/analytics into feature/profitroll/data-control
Reviewed-on: #63
2025-07-28 01:26:11 +03:00
7ef4372730 Added basic analytics collector (#62) 2025-07-28 00:23:36 +02:00
352f8c97ec WIP: Added consent durations and modified default embed colors (#51) 2025-07-24 19:52:12 +02:00
558b12bdbd WIP: Added basic implementation of consent withdrawal (#51) 2025-07-24 02:06:50 +02:00
6279bc4952 WIP: Added scope names to the localization and added a basic implementation of colors for data control (#51) 2025-07-24 00:40:08 +02:00
f61fa886d1 WIP: Added middleware for data control and changed the database index for consents (#51) 2025-07-23 21:55:43 +02:00
378473e453 WIP: Added scopes for commands and renamed consent scope "module_deepl" to "integration_deepl" 2025-07-22 01:31:58 +02:00
e0b2575d32 Added a minimal implementation for consent giving 2025-07-21 23:20:29 +02:00
0c2467209d Merge branch 'dev' into feature/profitroll/data-control
# Conflicts:
#	classes/__init__.py
#	classes/pycord_user.py
#	cogs/cog_admin.py
#	enums/__init__.py
#	enums/consent_scope.py
2025-07-21 22:43:55 +02:00
4be95428b5 Improved i18n in cog_admin, removed old wallet cog and added TODOs for consent durations 2025-07-21 22:38:32 +02:00
de2b04ca12 Made Wallet a child of BaseCacheable and improved caching 2025-07-21 00:24:06 +02:00
037e493bcc Implemented cache TTL 2025-07-21 00:23:06 +02:00
89307d8d0c Moved enums from classes/enums to enums 2025-07-21 00:22:31 +02:00
7565a643aa Merge pull request 'Update dependency libbot to v4.4.0' (#60) from renovate/libbot-4.x into dev
Reviewed-on: #60
2025-07-09 16:00:39 +03:00
d4474421e5 Update dependency libbot to v4.4.0 2025-07-09 16:00:19 +03:00
2684d9358e Merge pull request 'Update dependency libbot to v4.3.0' (#59) from renovate/libbot-4.x into dev
Reviewed-on: #59
2025-07-08 15:45:38 +03:00
a37827761b Update dependency libbot to v4.3.0 2025-07-08 02:41:39 +03:00
a553124e33 Merge pull request 'Update dependency fastapi to ~=0.116.0' (#58) from renovate/fastapi-0.x into dev
Reviewed-on: #58
2025-07-08 00:49:49 +03:00
cded34cb8a Update dependency fastapi to ~=0.116.0 2025-07-07 18:28:00 +03:00
71730362ef Closes #55 2025-06-07 21:06:16 +02:00
46edf5ea14 Removed old wallet cog and replaced "client" with "bot" in the new one 2025-06-07 00:43:47 +02:00
4ab7fb0630 WIP: Added config module and slash command stubs 2025-06-07 00:41:42 +02:00
9e10cf4fa4 Merge pull request 'Added stubs for Data and Consent cogs' (#54) from feature/data-control into feature/profitroll/data-control
Reviewed-on: #54
2025-06-07 00:21:27 +03:00
7b15480c30 WIP: Added stubs for Data and Consent cogs 2025-06-06 23:17:55 +02:00
996fe387df Improved health check and monitoring 2025-06-05 11:22:22 +02:00
fed2e0df07 Fixed database connection timeout not being handled during healthcheck 2025-06-04 09:58:01 +02:00
a109566738 Added missing FastAPI class 2025-06-04 09:42:39 +02:00
cbdfee63e4 WIP: Simple health check 2025-06-04 01:59:54 +02:00
1d8c29e73f Closes #12 2025-06-01 15:28:41 +02:00
59 changed files with 1627 additions and 371 deletions

20
Makefile Normal file
View File

@@ -0,0 +1,20 @@
.PHONY: setup update run dev-setup dev-update
setup:
uv venv
uv sync
update:
uv sync
run:
uv run uvicorn main:app
dev-setup:
uv venv
uv sync
uv sync --extra dev
dev-update:
uv sync
uv sync --extra dev

View File

@@ -1,8 +1,17 @@
from logging import Logger
from urllib.parse import urljoin
from fastapi import FastAPI
from fastapi_discord import DiscordOAuthClient
from fastapi_discord import RateLimited, Unauthorized
from fastapi_discord.exceptions import ClientSessionNotInitialized
from libbot.utils import config_get
from starlette.responses import JSONResponse
from classes.fastapi import FastAPI
from modules.utils.router_loader import run_router_setups
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
discord_oauth: DiscordOAuthClient = DiscordOAuthClient(
config_get("client_id", "api", "oauth"),
@@ -17,8 +26,30 @@ app: FastAPI = FastAPI(
version="0.0.1",
)
run_router_setups(app, "api.routers")
# TODO Replace this with a FastAPI lifespan
@app.on_event("startup")
async def on_startup():
await discord_oauth.init()
@app.exception_handler(Unauthorized)
async def unauthorized_error_handler(_, __) -> JSONResponse:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
@app.exception_handler(RateLimited)
async def rate_limit_error_handler(_, exc: RateLimited) -> JSONResponse:
return JSONResponse(
{"error": "RateLimited", "retry": exc.retry_after, "message": exc.message},
status_code=429,
)
@app.exception_handler(ClientSessionNotInitialized)
async def client_session_error_handler(_, exc: ClientSessionNotInitialized) -> JSONResponse:
logger.error("Client session was not initialized: %s", exc, exc_info=exc)
return JSONResponse({"error": "Internal Error"}, status_code=500)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.2 KiB

View File

@@ -1,40 +0,0 @@
from logging import Logger
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from api.app import app
from classes import PycordGuild
from classes.errors import GuildNotFoundError, WalletNotFoundError
from classes.wallet import Wallet
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@app.get("/v1/admin/guilds/{guild_id}", response_class=JSONResponse)
async def get_guild_v1(guild_id: int) -> JSONResponse:
try:
guild: PycordGuild = await PycordGuild.from_id(guild_id, allow_creation=False)
except GuildNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Guild not found"
) from exc
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
) from exc
return JSONResponse(guild.to_dict(json_compatible=True))
@app.get("/v1/admin/guilds/{guild_id}/wallets/{user_id}", response_class=JSONResponse)
async def get_guild_wallet_v1(guild_id: int, user_id: int) -> JSONResponse:
try:
wallet: Wallet = await Wallet.from_id(user_id, guild_id, allow_creation=False)
except WalletNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Wallet not found"
) from exc
return JSONResponse(wallet.to_dict(json_compatible=True))

View File

@@ -1,41 +0,0 @@
from logging import Logger
from fastapi_discord import RateLimited, Unauthorized
from fastapi_discord.exceptions import ClientSessionNotInitialized
from starlette.responses import JSONResponse
from api.app import app, discord_oauth
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@app.get("/v1/login", response_class=JSONResponse)
async def login() -> JSONResponse:
return JSONResponse({"url": discord_oauth.oauth_login_url})
@app.get("/v1/callback", response_class=JSONResponse)
async def callback(code: str) -> JSONResponse:
token, refresh_token = await discord_oauth.get_access_token(code)
return JSONResponse({"access_token": token, "refresh_token": refresh_token})
@app.exception_handler(Unauthorized)
async def unauthorized_error_handler(_, __) -> JSONResponse:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
@app.exception_handler(RateLimited)
async def rate_limit_error_handler(_, exc: RateLimited) -> JSONResponse:
return JSONResponse(
{"error": "RateLimited", "retry": exc.retry_after, "message": exc.message},
status_code=429,
)
@app.exception_handler(ClientSessionNotInitialized)
async def client_session_error_handler(_, exc: ClientSessionNotInitialized) -> JSONResponse:
logger.error("Client session was not initialized: %s", exc, exc_info=exc)
return JSONResponse({"error": "Internal Error"}, status_code=500)

View File

@@ -1,13 +0,0 @@
from logging import Logger
from starlette.responses import JSONResponse
from api.app import app
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@app.get("/v1/health", response_class=JSONResponse)
async def get_health_v1() -> JSONResponse:
return JSONResponse({"status": "ok"})

View File

@@ -1,23 +0,0 @@
from logging import Logger
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from api.app import app
from classes import PycordUser
from classes.errors import UserNotFoundError
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@app.get("/v1/admin/users/{user_id}", response_class=JSONResponse)
async def get_user_v1(user_id: int) -> JSONResponse:
try:
user: PycordUser = await PycordUser.from_id(user_id, allow_creation=False)
except UserNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
) from exc
return JSONResponse(user.to_dict(json_compatible=True))

View File

@@ -1,14 +0,0 @@
from logging import Logger
from pathlib import Path
from fastapi.responses import FileResponse
from api.app import app
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@app.get("/favicon.ico", response_class=FileResponse, include_in_schema=False)
async def favicon() -> FileResponse:
return FileResponse(Path("api/assets/favicon.ico"))

1
api/routers/__init__.py Normal file
View File

@@ -0,0 +1 @@
from . import auth, health, user, admin

View File

@@ -0,0 +1 @@
from . import guilds, users, wallets

View File

@@ -0,0 +1,42 @@
from logging import Logger
from fastapi import HTTPException, status, APIRouter
from fastapi.responses import JSONResponse
from starlette.requests import Request
from classes import PycordGuild
from classes.errors import GuildNotFoundError
from classes.fastapi import FastAPI
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1/admin/guilds", tags=["Admin - Guilds"])
# TODO Implement this method
@router_v1.get("/", response_class=JSONResponse)
async def get_guilds_v1() -> JSONResponse:
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented")
@router_v1.get("/{guild_id}", response_class=JSONResponse)
async def get_guild_v1(request: Request, guild_id: int) -> JSONResponse:
try:
guild: PycordGuild = await PycordGuild.from_id(
guild_id, allow_creation=False, cache=request.app.bot.cache
)
except GuildNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Guild not found"
) from exc
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
) from exc
return JSONResponse(guild.to_dict(json_compatible=True))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,34 @@
from logging import Logger
from fastapi import HTTPException, status, APIRouter
from fastapi.responses import JSONResponse
from starlette.requests import Request
from classes import PycordUser
from classes.errors import UserNotFoundError
from classes.fastapi import FastAPI
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(
prefix="/v1/admin/guilds/{guild_id}/users", tags=["Admin - Users"]
)
@router_v1.get("/{user_id}", response_class=JSONResponse)
async def get_guild_user_v1(request: Request, guild_id: int, user_id: int) -> JSONResponse:
try:
user: PycordUser = await PycordUser.from_id(
user_id, guild_id, allow_creation=False, cache=request.app.bot.cache
)
except UserNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
) from exc
return JSONResponse(user.to_dict(json_compatible=True))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,34 @@
from logging import Logger
from fastapi import HTTPException, status, APIRouter
from fastapi.responses import JSONResponse
from starlette.requests import Request
from classes.errors import WalletNotFoundError
from classes.fastapi import FastAPI
from classes.wallet import Wallet
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(
prefix="/v1/admin/guilds/{guild_id}/wallets", tags=["Admin - Wallets"]
)
@router_v1.get("/{user_id}", response_class=JSONResponse)
async def get_guild_wallet_v1(request: Request, guild_id: int, user_id: int) -> JSONResponse:
try:
wallet: Wallet = await Wallet.from_id(
user_id, guild_id, allow_creation=False, cache=request.app.bot.cache
)
except WalletNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Wallet not found"
) from exc
return JSONResponse(wallet.to_dict(json_compatible=True))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

27
api/routers/auth.py Normal file
View File

@@ -0,0 +1,27 @@
from logging import Logger
from fastapi import APIRouter
from starlette.responses import JSONResponse
from api.app import discord_oauth
from classes.fastapi import FastAPI
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1", tags=["Auth"])
@router_v1.get("/login", response_class=JSONResponse)
async def login() -> JSONResponse:
return JSONResponse({"url": discord_oauth.oauth_login_url})
@router_v1.get("/callback", response_class=JSONResponse)
async def callback(code: str) -> JSONResponse:
token, refresh_token = await discord_oauth.get_access_token(code)
return JSONResponse({"access_token": token, "refresh_token": refresh_token})
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

37
api/routers/health.py Normal file
View File

@@ -0,0 +1,37 @@
from datetime import datetime, timedelta
from logging import Logger
from typing import Optional
from zoneinfo import ZoneInfo
from fastapi import APIRouter, Request
from starlette.responses import JSONResponse
from classes import ApplicationHealth
from classes.fastapi import FastAPI
from modules.database import db
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1", tags=["User"])
@router_v1.get("/status", response_class=JSONResponse)
async def get_health_v1() -> JSONResponse:
return JSONResponse({"status": "ok"})
@router_v1.get("/health", response_class=JSONResponse)
async def get_health_v1(request: Request, detailed: Optional[bool] = False) -> JSONResponse:
if request.app.status is None or request.app.status.get_last_update() < (
datetime.now(tz=ZoneInfo("UTC")) - timedelta(seconds=30)
):
request.app.update_status(await ApplicationHealth.from_data(request.app, db))
health: ApplicationHealth = request.app.status
return JSONResponse(health.to_json(detailed=detailed if detailed is not None else False))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -1,20 +1,27 @@
from logging import Logger
from fastapi import Depends
from fastapi import Depends, APIRouter
from fastapi_discord import User
from starlette.responses import JSONResponse
from api.app import app, discord_oauth
from api.app import discord_oauth
from classes.fastapi import FastAPI
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1", tags=["User"])
@app.get(
"/v1/me",
@router_v1.get(
"/me",
dependencies=[Depends(discord_oauth.requires_authorization)],
response_model=User,
response_class=JSONResponse,
)
async def get_me_v1(user: User = Depends(discord_oauth.user)) -> User:
return user
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -1,7 +1,9 @@
from .consent import Consent
from .application_health import ApplicationHealth
from .guild_rules import GuildRules
from .pycord_guild import PycordGuild
from .pycord_user import PycordUser
from .service_status import ServiceStatus
# from .pycord_guild_colors import PycordGuildColors
# from .wallet import Wallet

View File

@@ -0,0 +1,155 @@
from dataclasses import dataclass
from datetime import datetime
from logging import Logger
from typing import Dict, Any, Optional
from zoneinfo import ZoneInfo
from libbot.cache.classes import Cache
from libbot.pycord.classes import PycordBot
from pymongo.asynchronous.database import AsyncDatabase
from pymongo.errors import ConnectionFailure
from classes.fastapi import FastAPI
from classes.service_status import ServiceStatus
from enums import HealthStatus
from modules.database import db_client
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@dataclass
class ApplicationHealth:
_last_update: datetime
api: ServiceStatus
bot: ServiceStatus
cache: ServiceStatus
database: ServiceStatus
@classmethod
async def from_data(cls, app: FastAPI, database: AsyncDatabase) -> "ApplicationHealth":
database_health: ServiceStatus = await ApplicationHealth.get_database_health(database)
cache_health: ServiceStatus = ApplicationHealth.get_cache_health(app.bot.cache)
data: Dict[str, Any] = {
"bot": ApplicationHealth.get_bot_health(app.bot, cache_health, database_health),
"cache": cache_health,
"database": database_health,
}
data["api"] = ApplicationHealth.get_api_health(
data["bot"], data["cache"], database_health
)
data["_last_update"] = datetime.now(tz=ZoneInfo("UTC"))
return cls(**data)
def update(self, app: FastAPI, database: AsyncDatabase) -> None:
raise NotImplementedError()
# TODO Fix the message
@staticmethod
def get_bot_health(
bot: PycordBot, cache_status: ServiceStatus, database_status: ServiceStatus
) -> ServiceStatus:
if not bot.is_ready():
return ServiceStatus(HealthStatus.FAILED, "discord connection has failed")
if database_status.status != HealthStatus.OPERATIONAL:
match database_status.status:
case HealthStatus.FAILED, HealthStatus.UNKNOWN:
return ServiceStatus(HealthStatus.FAILED, "database connection has failed")
case HealthStatus.DEGRADED:
return ServiceStatus(
HealthStatus.DEGRADED, "database connection is degraded"
)
if cache_status.status not in [HealthStatus.UNKNOWN, HealthStatus.OPERATIONAL]:
match cache_status.status:
case HealthStatus.FAILED:
return ServiceStatus(HealthStatus.DEGRADED, "cache connection has failed")
case HealthStatus.DEGRADED:
return ServiceStatus(HealthStatus.DEGRADED, "cache is degraded")
return ServiceStatus(HealthStatus.OPERATIONAL, None)
# TODO Fix the message
# TODO Implement this method
@staticmethod
def get_cache_health(cache: Cache) -> ServiceStatus:
return ServiceStatus(
HealthStatus.UNKNOWN,
None,
)
# TODO Fix the message
@staticmethod
async def get_database_health(database: AsyncDatabase) -> ServiceStatus:
try:
await db_client.admin.command("ping")
except ConnectionFailure as exc:
return ServiceStatus(HealthStatus.FAILED, str(exc))
return ServiceStatus(
HealthStatus.OPERATIONAL,
None,
)
@staticmethod
def get_api_health(
bot_status: ServiceStatus, cache_status: ServiceStatus, database_status: ServiceStatus
) -> ServiceStatus:
if database_status.status != HealthStatus.OPERATIONAL:
match database_status.status:
case HealthStatus.FAILED, HealthStatus.UNKNOWN:
return ServiceStatus(
HealthStatus.FAILED,
"database connection has failed",
)
case HealthStatus.DEGRADED:
return ServiceStatus(
HealthStatus.DEGRADED,
"database connection is degraded",
)
if bot_status.status != HealthStatus.OPERATIONAL:
match bot_status.status:
case HealthStatus.FAILED, HealthStatus.UNKNOWN:
return ServiceStatus(
HealthStatus.DEGRADED,
"bot integration has failed",
)
case HealthStatus.DEGRADED:
return ServiceStatus(
HealthStatus.DEGRADED,
"bot integration is degraded",
)
if cache_status.status not in [HealthStatus.OPERATIONAL, HealthStatus.UNKNOWN]:
match cache_status.status:
case HealthStatus.FAILED:
return ServiceStatus(HealthStatus.DEGRADED, "cache connection has failed")
case HealthStatus.DEGRADED:
return ServiceStatus(HealthStatus.DEGRADED, "cache is degraded")
return ServiceStatus(
HealthStatus.OPERATIONAL,
None,
)
def get_last_update(self) -> datetime:
return self._last_update
def to_json(self, detailed: Optional[bool] = False) -> Dict[str, Dict[str, str | None]]:
output: Dict[str, Any] = {
"api": self.api.to_json(detailed),
"bot": self.bot.to_json(detailed),
}
if detailed:
output["cache"] = self.cache.to_json(detailed)
output["database"] = self.database.to_json(detailed)
return output

View File

@@ -1,17 +1,19 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Literal, Any, Dict, Optional
from typing import Any, Dict, Optional
from zoneinfo import ZoneInfo
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo import DESCENDING
from pymongo.results import InsertOneResult, UpdateResult
from classes.base import BaseCacheable
from classes.enums import ConsentScope
from enums import ConsentScope
from modules.database import col_consents
from modules.utils import restore_from_cache
# TODO Implement all necessary methods
@dataclass
class Consent(BaseCacheable):
"""Dataclass of DB entry of a consent entry"""
@@ -31,21 +33,63 @@ class Consent(BaseCacheable):
_id: ObjectId
user_id: int
guild_id: int
scope: Literal[ConsentScope.GENERAL, ConsentScope.MODULE_DEEPL]
scope: ConsentScope
consent_date: datetime
expiration_date: datetime | None
withdrawal_date: datetime | None
@staticmethod
def get_cache_key(user_id: int, guild_id: int, scope: ConsentScope) -> str:
return f"{Consent.__short_name__}_{user_id}_{guild_id}_{scope.value}"
# TODO Implement this method
@classmethod
async def from_id(cls, id: ObjectId, cache: Optional[Cache] = None, **kwargs: Any) -> Any:
raise NotImplementedError()
# TODO Add documentation
@classmethod
def from_entry(cls, db_entry: Dict[str, Any]) -> "Consent":
async def from_combination(
cls,
user_id: int,
guild_id: int,
scope: ConsentScope,
cache: Optional[Cache] = None,
) -> Any:
cached_entry: Dict[str, Any] | None = restore_from_cache(
cls.__short_name__, Consent.get_cache_key(user_id, guild_id, scope), cache=cache
)
if cached_entry is not None:
return cls(**cls._entry_from_cache(cached_entry))
db_entry: Dict[str, Any] | None = await cls.__collection__.find_one(
{"user_id": user_id, "guild_id": guild_id, "scope": scope.value},
sort={"expiration_date": DESCENDING},
)
if db_entry is None:
# TODO Implement a unique exception
# raise ConsentNotFoundError(user_id, guild_id, scope)
raise RuntimeError(
f"Could not find a consent of user {user_id} from {guild_id} for scope '{scope.value}'"
)
if cache is not None:
cache.set_json(
Consent.get_cache_key(user_id, guild_id, scope),
cls._entry_to_cache(db_entry),
)
return cls(**db_entry)
# TODO Implement this method
@classmethod
def from_entry(cls, db_entry: Dict[str, Any]) -> "Consent":
db_entry["scope"] = ConsentScope(db_entry["scope"])
return cls(**db_entry)
# TODO Add documentation
@classmethod
async def give(
cls,
@@ -53,17 +97,77 @@ class Consent(BaseCacheable):
guild_id: int,
scope: ConsentScope,
expiration_date: Optional[datetime] = None,
cache: Optional[Cache] = None,
) -> Any:
raise NotImplementedError()
await cls.withdraw_scope_consents(user_id, guild_id, scope)
db_entry = Consent.get_defaults(user_id, guild_id, scope)
db_entry["scope"] = scope.value
db_entry["expiration_date"] = expiration_date
insert_result: InsertOneResult = await cls.__collection__.insert_one(db_entry)
db_entry["scope"] = scope
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(
Consent.get_cache_key(user_id, guild_id, scope),
cls._entry_to_cache(db_entry),
)
return cls(**db_entry)
@staticmethod
async def withdraw_scope_consents(
user_id: int, guild_id: int, scope: ConsentScope
) -> UpdateResult:
"""Look up consents of a user in a guild with a specified scope and withdraw them.
Args:
user_id (int): Discord ID of a user.
guild_id (int): Discord ID of a guild.
scope (:obj:ConsentScope): Scope to look for.
Returns:
UpdateResult: Result object of all affected consents.
"""
return await Consent.__collection__.update_many(
{
"user_id": user_id,
"guild_id": guild_id,
"scope": scope.value,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
},
{"$set": {"withdrawal_date": datetime.now(tz=ZoneInfo("UTC"))}},
)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self._id}"
return self.get_cache_key(self.user_id, self.guild_id, self.scope)
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
cache_entry["scope"] = cache_entry["scope"].value
cache_entry["consent_date"] = cache_entry["consent_date"].isoformat()
cache_entry["expiration_date"] = (
None
if cache_entry["expiration_date"] is None
else cache_entry["expiration_date"].isoformat()
)
cache_entry["withdrawal_date"] = (
None
if cache_entry["withdrawal_date"] is None
else cache_entry["withdrawal_date"].isoformat()
)
return cache_entry
@@ -72,6 +176,18 @@ class Consent(BaseCacheable):
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
cache_entry["scope"] = ConsentScope(cache_entry["scope"])
cache_entry["consent_date"] = datetime.fromisoformat(cache_entry["consent_date"])
cache_entry["expiration_date"] = (
None
if cache_entry["expiration_date"] is None
else datetime.fromisoformat(cache_entry["expiration_date"])
)
cache_entry["withdrawal_date"] = (
None
if cache_entry["withdrawal_date"] is None
else datetime.fromisoformat(cache_entry["withdrawal_date"])
)
return db_entry
@@ -88,7 +204,7 @@ class Consent(BaseCacheable):
"_id": self._id if not json_compatible else str(self._id),
"user_id": self.user_id,
"guild_id": self.guild_id,
"scope": self.scope,
"scope": self.scope if not json_compatible else self.scope.value,
"consent_date": self.consent_date.isoformat(),
"expiration_date": (
None if self.expiration_date is None else self.expiration_date.isoformat()
@@ -100,12 +216,14 @@ class Consent(BaseCacheable):
@staticmethod
def get_defaults(
user_id: Optional[int] = None, guild_id: Optional[int] = None
user_id: Optional[int] = None,
guild_id: Optional[int] = None,
scope: Optional[ConsentScope] = None,
) -> Dict[str, Any]:
return {
"user_id": user_id,
"guild_id": guild_id,
"scope": None,
"scope": scope,
"consent_date": datetime.now(tz=ZoneInfo("UTC")),
"expiration_date": None,
"withdrawal_date": None,

View File

@@ -1,3 +0,0 @@
from .consent_scope import ConsentScope
from .message_events import MessageEvents
from .punishment import Punishment

18
classes/fastapi.py Normal file
View File

@@ -0,0 +1,18 @@
from typing import Optional, Any
from fastapi import FastAPI as OriginalFastAPI
from libbot.pycord.classes import PycordBot
class FastAPI(OriginalFastAPI):
def __init__(self, *args, bot: Optional[PycordBot] = None, **kwargs) -> None:
self.bot: PycordBot | None = bot
self.status: Any | None = None
super().__init__(*args, **kwargs)
def set_bot(self, bot: PycordBot) -> None:
self.bot = bot
def update_status(self, status: Any) -> None:
self.status = status

View File

@@ -1,7 +1,7 @@
from dataclasses import dataclass
from typing import Literal
from classes.enums import Punishment
from enums import Punishment
@dataclass

View File

@@ -2,11 +2,11 @@ import logging
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Any, Dict, Literal
from typing import Any, Dict, Literal, Optional
from zoneinfo import ZoneInfo
from aiohttp import ClientSession
from discord import Activity, ActivityType, Guild, User
from discord import Activity, ActivityType, Guild, User, Embed, Colour
from libbot.cache.classes import CacheMemcached, CacheRedis
from libbot.cache.manager import create_cache_client
from libbot.i18n import BotLocale
@@ -14,7 +14,9 @@ from libbot.pycord.classes import PycordBot as LibPycordBot
from libbot.utils import json_read
from classes import PycordGuild, PycordUser
from enums import CacheTTL, EmbedColor
from modules.database import _update_database_indexes
from modules.utils.color_utils import hex_to_int
logger: Logger = logging.getLogger(__name__)
@@ -47,7 +49,10 @@ class PycordBot(LibPycordBot):
if "cache" in self.config and cache_type is not None:
self.cache = create_cache_client(
self.config, cache_type, prefix=self.config["cache"][cache_type]["prefix"]
self.config,
cache_type,
prefix=self.config["cache"][cache_type]["prefix"],
default_ttl_seconds=CacheTTL.NORMAL.value,
)
def _modified_string_getter(self, key: str, *args: str, locale: str | None = None) -> Any:
@@ -113,11 +118,12 @@ class PycordBot(LibPycordBot):
activity_message,
)
async def find_user(self, user: int | User) -> PycordUser:
async def find_user(self, user: int | User, guild_id: int) -> PycordUser:
"""Find User by its ID or User object.
Args:
user (int | User): ID or User object to extract ID from
guild_id (int): ID of the guild user is member of
Returns:
PycordUser: User object
@@ -126,9 +132,9 @@ class PycordBot(LibPycordBot):
UserNotFoundException: User was not found and creation was not allowed
"""
return (
await PycordUser.from_id(user, cache=self.cache)
await PycordUser.from_id(user, guild_id, cache=self.cache)
if isinstance(user, int)
else await PycordUser.from_id(user.id, cache=self.cache)
else await PycordUser.from_id(user.id, guild_id, cache=self.cache)
)
async def find_guild(self, guild: int | Guild) -> PycordGuild:
@@ -174,3 +180,38 @@ class PycordBot(LibPycordBot):
# args=[self, self.client_session],
# )
pass
# TODO Add support for guild colors
def create_embed(
self,
title: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
color: Optional[EmbedColor] = EmbedColor.PRIMARY,
) -> Embed:
return Embed(
title=title,
description=description,
url=url,
color=Colour(hex_to_int(self.config["colors"][color.value])),
)
def create_embed_error(
self,
title: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
) -> Embed:
return self.create_embed(
title=title, description=description, url=url, color=EmbedColor.ERROR
)
def create_embed_success(
self,
title: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
) -> Embed:
return self.create_embed(
title=title, description=description, url=url, color=EmbedColor.SUCCESS
)

View File

@@ -2,7 +2,7 @@ import logging
from dataclasses import dataclass
from datetime import datetime
from logging import Logger
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from bson import ObjectId
@@ -11,9 +11,9 @@ from pymongo.results import InsertOneResult
from classes import Consent
from classes.base import BaseCacheable
from classes.enums import ConsentScope
from classes.errors.pycord_user import UserNotFoundError
from classes.wallet import Wallet
from enums import ConsentScope
from modules.database import col_users
from modules.utils import restore_from_cache
@@ -163,16 +163,17 @@ class PycordUser(BaseCacheable):
async def purge(self, cache: Optional[Cache] = None) -> None:
await super().purge(cache)
async def get_wallet(self, guild_id: int) -> Wallet:
async def get_wallet(self, guild_id: int, cache: Optional[Cache] = None) -> Wallet:
"""Get wallet of the user.
Args:
guild_id (int): Guild ID of the wallet
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
Wallet: Wallet object of the user
"""
return await Wallet.from_id(self.id, guild_id)
return await Wallet.from_id(self.id, guild_id, cache=cache)
# TODO Add documentation
async def has_active_consent(self, scope: ConsentScope) -> bool:
@@ -183,9 +184,10 @@ class PycordUser(BaseCacheable):
"guild_id": self.guild_id,
"scope": scope.value,
"withdrawal_date": None,
"$gt": {
"expiration_date": datetime.now(tz=ZoneInfo("UTC")).replace(tzinfo=None)
},
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
}
)
@@ -193,9 +195,12 @@ class PycordUser(BaseCacheable):
# TODO Add documentation
async def give_consent(
self, scope: ConsentScope, expiration_date: Optional[datetime] = None
self,
scope: ConsentScope,
expiration_date: Optional[datetime] = None,
cache: Optional[Cache] = None,
) -> None:
await Consent.give(self.id, self.guild_id, scope, expiration_date)
await Consent.give(self.id, self.guild_id, scope, expiration_date, cache=cache)
# TODO Add documentation
async def withdraw_consent(
@@ -204,15 +209,69 @@ class PycordUser(BaseCacheable):
cache: Optional[Cache] = None,
) -> None:
# TODO Test this query
async for consent_entry in Consent.__collection__.find(
await Consent.__collection__.update_many(
{
"user_id": self.id,
"guild_id": self.guild_id,
"scope": scope.value,
"withdrawal_date": None,
"$gt": {
"expiration_date": datetime.now(tz=ZoneInfo("UTC")).replace(tzinfo=None)
},
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
},
{"$set": {"withdrawal_date": datetime.now(tz=ZoneInfo("UTC"))}},
)
if cache is not None:
cache.delete(Consent.get_cache_key(self.id, self.guild_id, scope))
# TODO Add documentation
async def withdraw_all_consents(
self,
cache: Optional[Cache] = None,
) -> None:
filter: Dict[str, Any] = {
"user_id": self.id,
"guild_id": self.guild_id,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
}
consents: List[Dict[str, Any]] | None = await Consent.__collection__.find(
filter
).to_list()
await Consent.__collection__.update_many(
filter,
{"$set": {"withdrawal_date": datetime.now(tz=ZoneInfo("UTC"))}},
)
if cache is not None and consents is not None:
for consent in consents:
cache.delete(
Consent.get_cache_key(
self.id, self.guild_id, ConsentScope(consent["scope"])
)
)
async def get_consents(self) -> List[Consent]:
consents: List[Consent] = []
async for consent in Consent.__collection__.find(
{
"user_id": self.id,
"guild_id": self.guild_id,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
}
):
await Consent.from_entry(consent_entry).withdraw(cache)
consents.append(Consent.from_entry(consent))
return consents

23
classes/service_status.py Normal file
View File

@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import Literal, Dict, Optional
from enums import HealthStatus
@dataclass
class ServiceStatus:
status: Literal[
HealthStatus.OPERATIONAL,
HealthStatus.DEGRADED,
HealthStatus.FAILED,
HealthStatus.UNKNOWN,
]
message: str | None
def to_json(self, detailed: Optional[bool] = False) -> Dict[str, str | None]:
output: Dict[str, str | None] = {"status": self.status.value}
if detailed:
output["message"] = self.message
return output

View File

@@ -5,8 +5,10 @@ from logging import Logger
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from classes.base import BaseCacheable
from classes.errors.wallet import (
WalletBalanceLimitExceeded,
WalletInsufficientFunds,
@@ -14,12 +16,19 @@ from classes.errors.wallet import (
WalletOverdraftLimitExceeded,
)
from modules.database import col_wallets
from modules.utils import restore_from_cache
logger: Logger = logging.getLogger(__name__)
@dataclass
class Wallet:
class Wallet(BaseCacheable):
"""Dataclass of DB entry of a wallet"""
__slots__ = ("_id", "owner_id", "guild_id", "balance", "is_frozen", "created")
__short_name__ = "wallet"
__collection__ = col_wallets
_id: ObjectId
owner_id: int
guild_id: int
@@ -30,8 +39,19 @@ class Wallet:
# TODO Write a docstring
@classmethod
async def from_id(
cls, owner_id: int, guild_id: int, allow_creation: bool = True
cls,
owner_id: int,
guild_id: int,
allow_creation: bool = True,
cache: Optional[Cache] = None,
) -> "Wallet":
cached_entry: Dict[str, Any] | None = restore_from_cache(
cls.__short_name__, f"{owner_id}_{guild_id}", cache=cache
)
if cached_entry is not None:
return cls(**cls._entry_from_cache(cached_entry))
db_entry = await col_wallets.find_one({"owner_id": owner_id, "guild_id": guild_id})
if db_entry is None:
@@ -44,8 +64,34 @@ class Wallet:
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(
f"{cls.__short_name__}_{owner_id}_{guild_id}", cls._entry_to_cache(db_entry)
)
return cls(**db_entry)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self.owner_id}_{self.guild_id}"
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
cache_entry["created"] = cache_entry["created"].isoformat()
return cache_entry
@staticmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
db_entry["created"] = datetime.fromisoformat(db_entry["created"])
return db_entry
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert Wallet object to a JSON representation.
@@ -61,19 +107,9 @@ class Wallet:
"guild_id": self.guild_id,
"balance": self.balance,
"is_frozen": self.is_frozen,
"created": self.created,
"created": self.created if not json_compatible else self.created.isoformat(),
}
async def _set(self, key: str, value: Any) -> None:
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await col_wallets.update_one({"_id": self._id}, {"$set": {key: value}}, upsert=True)
logger.info("Set attribute '%s' of the wallet %s to '%s'", key, str(self._id), value)
@staticmethod
def get_defaults(
owner_id: Optional[int] = None, guild_id: Optional[int] = None
@@ -94,21 +130,32 @@ class Wallet:
return Wallet.get_defaults()[key]
# TODO Write a docstring
async def freeze(self) -> None:
await self._set("is_frozen", True)
async def freeze(
self,
cache: Optional[Cache] = None,
) -> None:
await self.update(cache, is_frozen=True)
# TODO Write a docstring
async def unfreeze(self) -> None:
await self._set("is_frozen", False)
async def unfreeze(
self,
cache: Optional[Cache] = None,
) -> None:
await self.update(cache, is_frozen=False)
# TODO Write a docstring
async def deposit(self, amount: float, balance_limit: Optional[float] = None) -> float:
async def deposit(
self,
amount: float,
balance_limit: Optional[float] = None,
cache: Optional[Cache] = None,
) -> float:
new_balance: float = round(self.balance + amount, 2)
if balance_limit is not None and new_balance > balance_limit:
raise WalletBalanceLimitExceeded(self, amount, balance_limit)
await self._set("balance", new_balance)
await self.update(cache, balance=new_balance)
return new_balance
@@ -118,6 +165,7 @@ class Wallet:
amount: float,
allow_overdraft: bool = False,
overdraft_limit: Optional[float] = None,
cache: Optional[Cache] = None,
) -> float:
if amount > self.balance:
if not allow_overdraft or overdraft_limit is None:
@@ -128,7 +176,7 @@ class Wallet:
new_balance: float = round(self.balance - amount, 2)
await self._set("balance", new_balance)
await self.update(cache, balance=new_balance)
return new_balance
@@ -140,6 +188,7 @@ class Wallet:
balance_limit: Optional[float] = None,
allow_overdraft: bool = False,
overdraft_limit: Optional[float] = None,
cache: Optional[Cache] = None,
) -> None:
# TODO Replace with a concrete exception
if amount < 0:
@@ -148,7 +197,7 @@ class Wallet:
# allow_creation might need to be set to False in the future
# if users will be able to opt out from having a wallet
wallet: Wallet = await self.from_id(
wallet_owner_id, wallet_guild_id, allow_creation=True
wallet_owner_id, wallet_guild_id, allow_creation=True, cache=cache
)
if balance_limit is not None and amount + wallet.balance > balance_limit:
@@ -162,5 +211,5 @@ class Wallet:
raise WalletOverdraftLimitExceeded(self, amount, overdraft_limit)
# TODO Make a sanity check to revert the transaction if anything goes wrong
await self.withdraw(amount, allow_overdraft, overdraft_limit)
await wallet.deposit(amount, balance_limit)
await self.withdraw(amount, allow_overdraft, overdraft_limit, cache=cache)
await wallet.deposit(amount, balance_limit, cache=cache)

View File

@@ -1,4 +1,5 @@
from discord import ApplicationContext, Cog, slash_command
from discord.ext.commands import dm_only
from libbot.i18n import _, in_every_locale
from classes.pycord_bot import PycordBot
@@ -15,14 +16,40 @@ class CogAdmin(Cog):
description=_("description", "commands", "reload"),
description_localizations=in_every_locale("description", "commands", "reload"),
)
@dm_only()
async def command_guess(self, ctx: ApplicationContext) -> None:
if ctx.user.id not in self.bot.owner_ids:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._(
"permission_denied_title", "messages", "general", locale=ctx.locale
),
self.bot._("permission_denied", "messages", "general", locale=ctx.locale),
),
ephemeral=True,
)
return
try:
self.bot.reload()
await self.bot.set_status()
await ctx.respond("Okay.")
await ctx.respond(
embed=self.bot.create_embed_success(
self.bot._("success_title", "messages", "general", locale=ctx.locale),
self.bot._("config_reload_success", "messages", "admin", locale=ctx.locale),
)
)
except Exception as exc:
await ctx.respond(f"Not okay:\n```\n{exc}\n```", ephemeral=True)
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "general", locale=ctx.locale),
self.bot._(
"config_reload_error", "messages", "admin", locale=ctx.locale
).format(error=exc),
),
ephemeral=True,
)
def setup(bot: PycordBot) -> None:

99
cogs/cog_analytics.py Normal file
View File

@@ -0,0 +1,99 @@
from datetime import datetime
from logging import Logger
from zoneinfo import ZoneInfo
from discord import Cog, Message
from classes.pycord_bot import PycordBot
from enums import AnalyticsEventType
from modules.database import col_analytics
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
class CogAnalytics(Cog):
def __init__(self, bot: PycordBot):
self.bot: PycordBot = bot
@Cog.listener()
async def on_message(self, message: Message) -> None:
if (
message.guild is None
or message.channel is None
or message.author is None
or message.author.bot
):
return
await col_analytics.insert_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_SENT.value,
"event_date": message.created_at,
"guild_id": message.guild.id,
"channel_id": message.channel.id,
"message_id": message.id,
"user_id": message.author.id,
"is_deleted": False,
}
)
@Cog.listener()
async def on_message_edit(self, before: Message, after: Message) -> None:
if (
after.guild is None
or after.channel is None
or after.author is None
or after.author.bot
):
return
await col_analytics.insert_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_EDITED.value,
"event_date": after.edited_at,
"guild_id": after.guild.id,
"channel_id": after.channel.id,
"message_id": after.id,
"user_id": after.author.id,
}
)
@Cog.listener()
async def on_message_delete(self, message: Message) -> None:
if (
message.guild is None
or message.channel is None
or message.author is None
or message.author.bot
):
return
await col_analytics.insert_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_DELETED.value,
"event_date": datetime.now(tz=ZoneInfo("UTC")),
"guild_id": message.guild.id,
"channel_id": message.channel.id,
"message_id": message.id,
"user_id": message.author.id,
}
)
await col_analytics.update_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_SENT.value,
"guild_id": message.guild.id,
"channel_id": message.channel.id,
"message_id": message.id,
},
{
"$set": {
"is_deleted": True,
}
},
)
def setup(bot: PycordBot) -> None:
bot.add_cog(CogAnalytics(bot))

335
cogs/cog_consent.py Normal file
View File

@@ -0,0 +1,335 @@
import logging
from datetime import datetime
from logging import Logger
from typing import Any, Dict, List
from zoneinfo import ZoneInfo
from discord import (
ApplicationContext,
Interaction,
OptionChoice,
SlashCommandGroup,
option,
)
from discord.ext import commands
from discord.ext.commands import guild_only
from libbot.i18n import _, in_every_locale
from tempora import parse_timedelta
from classes import Consent, PycordUser
from classes.pycord_bot import PycordBot
from enums import ConsentDuration, ConsentScope
logger: Logger = logging.getLogger(__name__)
class CogConsent(commands.Cog):
def __init__(self, client: PycordBot):
self.bot: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup("consent", "Consent management")
@staticmethod
def _get_scope_choices() -> List[OptionChoice]:
choices: List[OptionChoice] = []
for scope in ConsentScope._member_map_.values():
scope_value: str = scope.value
choices.append(
OptionChoice(
_("name", "data_control", "scopes", scope_value),
scope_value,
in_every_locale("name", "data_control", "scopes", scope_value),
)
)
return choices
@staticmethod
def _get_consent_durations() -> List[OptionChoice]:
choices: List[OptionChoice] = []
for duration in ConsentDuration._member_map_.values():
duration_value: str = duration.value
choices.append(
OptionChoice(
_(duration_value, "data_control", "consent_durations"),
duration_value,
in_every_locale(duration_value, "data_control", "consent_durations"),
)
)
return choices
# /consent terms <scope>
# Will provide information about terms
# TODO Implement i18n
# TODO Implement consent duration
@command_group.command(
name="terms",
description="View terms for the consent scope",
)
@guild_only()
@option(
"scope",
description="Scope of the consent",
choices=_get_scope_choices(),
)
async def command_consent_terms(self, ctx: ApplicationContext, scope: str) -> None:
scopes_config: Dict[str, Dict[str, Any]] = self.bot.config["modules"]["consent"][
"scopes"
]
if scope not in scopes_config:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "admin", locale=ctx.locale),
"Scope does not exist in the config!",
),
ephemeral=True,
)
return
scope_config: Dict[str, Any] = scopes_config[scope]
interaction: Interaction = await ctx.respond(
embed=self.bot.create_embed(
"Terms for {third_party_notice}**{scope_name}**".format(
third_party_notice=(
"" if not scope_config["is_third_party"] else "a third-party scope "
),
scope_name=self.bot._(
"name", "data_control", "scopes", scope, locale=ctx.locale
),
),
"Terms of use: {terms_url}\nPrivacy policy: {privacy_url}\n\nNote: Any consent given on this Discord server will be valid only for this server.".format(
terms_url=scope_config["terms_url"],
privacy_url=scope_config["privacy_url"],
),
),
ephemeral=True,
)
# /consent give <scope> [<duration>]
# Will provide information about terms and a button to confirm
# TODO Implement i18n
@command_group.command(
name="give",
description="Give consent to the scope",
)
@guild_only()
@option(
"scope",
description="Scope of the consent",
choices=_get_scope_choices(),
)
@option(
"duration",
description="Duration of the consent",
choices=_get_consent_durations(),
)
async def command_consent_give(
self, ctx: ApplicationContext, scope: str, duration: str = ConsentDuration.NORMAL.value
) -> None:
scopes_config: Dict[str, Dict[str, Any]] = self.bot.config["modules"]["consent"][
"scopes"
]
if scope not in scopes_config:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "admin", locale=ctx.locale),
"Scope does not exist in the config!",
),
ephemeral=True,
)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
is_scope_third_party: bool = self.bot.config["modules"]["consent"]["scopes"][scope][
"is_third_party"
]
expiration_date: datetime = datetime.now(tz=ZoneInfo("UTC")) + parse_timedelta(
self.bot.config["modules"]["consent"]["durations"][
"third_party" if is_scope_third_party else "first_party"
][duration]
)
try:
await user.give_consent(ConsentScope(scope), expiration_date, cache=self.bot.cache)
await ctx.respond(
embed=self.bot.create_embed_success("Success", "Consent has been given."),
ephemeral=True,
)
except Exception as exc:
logger.error("Could not give consent due to: %s", exc, exc_info=exc)
await ctx.respond(
embed=self.bot.create_embed_error("Error", "Something went wrong!"),
ephemeral=True,
)
# /consent withdraw <scope>
# Will directly withdraw consent if confirmation is provided
# TODO Implement i18n
# TODO Implement the command
@command_group.command(
name="withdraw",
description="Withdraw consent to the scope",
)
@guild_only()
@option(
"scope",
description="Scope of the consent",
choices=_get_scope_choices(),
)
async def command_consent_withdraw(self, ctx: ApplicationContext, scope: str) -> None:
scopes_config: Dict[str, Dict[str, Any]] = self.bot.config["modules"]["consent"][
"scopes"
]
if scope not in scopes_config:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "admin", locale=ctx.locale),
"Scope does not exist in the config!",
),
ephemeral=True,
)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
await user.withdraw_consent(ConsentScope(scope), cache=self.bot.cache)
await ctx.respond(
embed=self.bot.create_embed("Done", "The consent has been withdrawn."),
ephemeral=True,
)
# /consent give_all [<duration>] [<confirm>]
# Will inform about necessity to review all scopes and a button to confirm
# TODO Implement i18n
@command_group.command(
name="give_all",
description="Give consent to all scopes",
)
@guild_only()
@option(
"duration",
description="Duration of the consent",
choices=_get_consent_durations(),
)
@option(
"confirm",
description="Confirmation of the action",
required=False,
)
async def command_consent_give_all(
self,
ctx: ApplicationContext,
confirm: bool = False,
duration: str = ConsentDuration.NORMAL.value,
) -> None:
if not confirm:
await ctx.respond("Operation not confirmed!", ephemeral=True)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
try:
for scope in ConsentScope._member_map_.values():
is_scope_third_party: bool = self.bot.config["modules"]["consent"]["scopes"][
scope.value
]["is_third_party"]
expiration_date: datetime = datetime.now(tz=ZoneInfo("UTC")) + parse_timedelta(
self.bot.config["modules"]["consent"]["durations"][
"third_party" if is_scope_third_party else "first_party"
][duration]
)
await user.give_consent(
ConsentScope(scope), expiration_date, cache=self.bot.cache
)
except Exception as exc:
logger.error("Could not give consent due to: %s", exc, exc_info=exc)
await ctx.respond(
embed=self.bot.create_embed_error("Error", "Something went wrong!"),
ephemeral=True,
)
return
await ctx.respond(
embed=self.bot.create_embed_success("Success", "Consent has been given."),
ephemeral=True,
)
# /consent withdraw_all [<confirm>]
# Will directly withdraw all consents if confirmation is provided
# TODO Implement i18n
@command_group.command(
name="withdraw_all",
description="Withdraw consent to all scopes",
)
@guild_only()
@option(
"confirm",
description="Confirmation of the action",
required=False,
)
async def command_consent_withdraw_all(
self, ctx: ApplicationContext, confirm: bool = False
) -> None:
if not confirm:
await ctx.respond("Operation not confirmed!", ephemeral=True)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
await user.withdraw_all_consents(cache=self.bot.cache)
await ctx.respond(
embed=self.bot.create_embed("Done", "All consents have been withdrawn."),
ephemeral=True,
)
# /consent review
# Will show all consents provided by the user, including scopes and expiration dates
# TODO Implement i18n
@command_group.command(
name="review",
description="Review all given consents",
)
@guild_only()
async def command_consent_review(self, ctx: ApplicationContext) -> None:
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
consents: List[Consent] = await user.get_consents()
if len(consents) == 0:
await ctx.respond(
embed=self.bot.create_embed("Consents", "You have no active consents."),
ephemeral=True,
)
return
joined_consents: str = "\n".join(
[
f"`{consent.scope.value}` - Expires <t:{int(consent.expiration_date.timestamp())}>"
for consent in consents
]
)
await ctx.respond(
embed=self.bot.create_embed("Consents", f"Active consents:\n{joined_consents}"),
ephemeral=True,
)
def setup(client: PycordBot) -> None:
client.add_cog(CogConsent(client))

51
cogs/cog_data.py Normal file
View File

@@ -0,0 +1,51 @@
import logging
from logging import Logger
from discord import SlashCommandGroup, ApplicationContext, option
from discord.ext import commands
from classes.pycord_bot import PycordBot
logger: Logger = logging.getLogger(__name__)
class CogData(commands.Cog):
def __init__(self, client: PycordBot):
self.bot: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup("data", "Data management")
# /data checkout
# Export all user data in a ZIP archive
# TODO Implement i18n
# TODO Implement the command
@command_group.command(
name="checkout",
description="Checkout all user data",
)
async def command_data_checkout(self, ctx: ApplicationContext) -> None:
await ctx.respond("Command is not implemented!", ephemeral=True)
# /data purge [<confirm>]
# Soft-delete all user data
# TODO Implement i18n
# TODO Implement the command
@command_group.command(
name="purge",
description="Delete all user data",
)
@option(
"confirm",
description="Confirmation of the action",
required=False,
)
async def command_data_purge(self, ctx: ApplicationContext, confirm: bool = False) -> None:
if not confirm:
await ctx.respond("Operation not confirmed!", ephemeral=True)
return
await ctx.respond("Command is not implemented!", ephemeral=True)
def setup(client: PycordBot) -> None:
client.add_cog(CogData(client))

View File

@@ -1,8 +1,10 @@
from logging import Logger
from discord import Cog
from discord import Cog, ApplicationContext, DiscordException
from discord.ext.commands import CheckFailure
from classes.pycord_bot import PycordBot
from enums import EmbedColor
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
@@ -19,6 +21,27 @@ class CogUtility(Cog):
await self.bot.set_status()
# TODO Should probably also add an error message displayed to users
@Cog.listener()
async def on_application_command_error(
self, ctx: ApplicationContext, error: DiscordException
) -> None:
if isinstance(error, CheckFailure):
await ctx.respond(
embed=self.bot.create_embed(
"Something went wrong", str(error), color=EmbedColor.ERROR
),
ephemeral=True,
)
return
logger.error(
"An error has occurred during execution of a command %s: %s",
ctx.command,
error,
exc_info=error,
)
def setup(bot: PycordBot) -> None:
bot.add_cog(CogUtility(bot))

View File

@@ -3,17 +3,20 @@ from logging import Logger
from discord import ApplicationContext, SlashCommandGroup, User, option
from discord.ext import commands
from discord.ext.commands import guild_only
from classes.errors import WalletInsufficientFunds
from classes.pycord_bot import PycordBot
from classes.wallet import Wallet
from enums import ConsentScope
from modules.middleware import user_consent_required
logger: Logger = logging.getLogger(__name__)
class CogWallet(commands.Cog):
def __init__(self, client: PycordBot):
self.client: PycordBot = client
self.bot: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup("wallet", "Wallet management")
@@ -21,19 +24,26 @@ class CogWallet(commands.Cog):
name="balance",
description="View wallet's balance",
)
@guild_only()
@option("user", description="User whose balance to check (if not your own)", required=False)
@user_consent_required(ConsentScope.GENERAL)
async def command_wallet_balance(self, ctx: ApplicationContext, user: User = None) -> None:
wallet: Wallet = await Wallet.from_id(
ctx.user.id if not user else user.id, ctx.guild_id
ctx.user.id if not user else user.id, ctx.guild_id, cache=self.bot.cache
)
await ctx.respond(
self.client._("balance_own", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance
)
if user is None
else self.client._("balance_user", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance, user=user.display_name
embed=self.bot.create_embed(
self.bot._("balance_title", "messages", "wallet", locale=ctx.locale),
(
self.bot._("balance_own", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance
)
if user is None
else self.bot._(
"balance_user", "messages", "wallet", locale=ctx.locale
).format(balance=wallet.balance, user=user.display_name)
),
)
)
@@ -41,6 +51,7 @@ class CogWallet(commands.Cog):
name="transfer",
description="View wallet's balance",
)
@guild_only()
@option("user", description="Recipient")
@option("amount", description="Amount", min_value=0.01)
async def command_wallet_transfer(
@@ -50,21 +61,27 @@ class CogWallet(commands.Cog):
# Guild will be needed for overdraft options
# guild: PycordGuild = await PycordGuild.from_id(ctx.guild_id)
wallet: Wallet = await Wallet.from_id(ctx.user.id, ctx.guild_id)
wallet: Wallet = await Wallet.from_id(ctx.user.id, ctx.guild_id, cache=self.bot.cache)
try:
await wallet.transfer(user.id, ctx.guild_id, amount)
except WalletInsufficientFunds:
await ctx.respond(
self.client._(
"transfer_insufficient_funds", "messages", "wallet", locale=ctx.locale
).format(amount=round(abs(wallet.balance - amount), 2))
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "general", locale=ctx.locale),
self.bot._(
"transfer_insufficient_funds", "messages", "wallet", locale=ctx.locale
).format(amount=round(abs(wallet.balance - amount), 2)),
)
)
return
await ctx.respond(
self.client._("transfer_success", "messages", "wallet", locale=ctx.locale).format(
amount=amount, recipient=user.display_name
embed=self.bot.create_embed_success(
self.bot._("success_title", "messages", "general", locale=ctx.locale),
self.bot._("transfer_success", "messages", "wallet", locale=ctx.locale).format(
amount=amount, recipient=user.display_name
),
)
)

View File

@@ -1,73 +0,0 @@
import logging
from logging import Logger
from discord import ApplicationContext, SlashCommandGroup, User, option
from discord.ext import commands
from classes.errors import WalletInsufficientFunds
from classes.pycord_bot import PycordBot
from classes.wallet import Wallet
logger: Logger = logging.getLogger(__name__)
class WalletCog(commands.Cog):
def __init__(self, client: PycordBot):
self.client: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup("wallet", "Wallet management")
@command_group.command(
name="balance",
description="View wallet's balance",
)
@option("user", description="User whose balance to check (if not your own)", required=False)
async def command_wallet_balance(self, ctx: ApplicationContext, user: User = None) -> None:
wallet: Wallet = await Wallet.from_id(
ctx.user.id if not user else user.id, ctx.guild_id
)
await ctx.respond(
self.client._("balance_own", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance
)
if user is None
else self.client._("balance_user", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance, user=user.display_name
)
)
@command_group.command(
name="transfer",
description="View wallet's balance",
)
@option("user", description="Recipient")
@option("amount", description="Amount", min_value=0.01)
async def command_wallet_transfer(
self, ctx: ApplicationContext, user: User, amount: float
) -> None:
amount = round(amount, 2)
# Guild will be needed for overdraft options
# guild: PycordGuild = await PycordGuild.from_id(ctx.guild_id)
wallet: Wallet = await Wallet.from_id(ctx.user.id, ctx.guild_id)
try:
await wallet.transfer(user.id, ctx.guild_id, amount)
except WalletInsufficientFunds:
await ctx.respond(
self.client._(
"transfer_insufficient_funds", "messages", "wallet", locale=ctx.locale
).format(amount=round(abs(wallet.balance - amount), 2))
)
return
await ctx.respond(
self.client._("transfer_success", "messages", "wallet", locale=ctx.locale).format(
amount=amount, recipient=user.display_name
)
)
def setup(client: PycordBot) -> None:
client.add_cog(WalletCog(client))

View File

@@ -44,7 +44,43 @@
"privacy": {
"api_endpoint": "https://api.javelina.eu/v1"
},
"colors": {
"primary": "#A4A4A6",
"secondary": "#595351",
"success": "#57F287",
"warning": "#FEE75C",
"error": "#ED4245"
},
"modules": {
"consent": {
"enabled": true,
"scopes": {
"general": {
"name": "General",
"is_third_party": false,
"terms_url": "https://www.javelina.eu/terms-of-service",
"privacy_url": "https://www.javelina.eu/privacy-policy"
},
"integration_deepl": {
"name": "DeepL",
"is_third_party": true,
"terms_url": "https://www.deepl.com/en/terms-of-use",
"privacy_url": "https://www.deepl.com/en/privacy"
}
},
"durations": {
"first_party": {
"short": "1 month",
"normal": "12 months",
"long": "24 months"
},
"third_party": {
"short": "1 month",
"normal": "6 months",
"long": "12 months"
}
}
},
"leveling": {
"addition": 50,
"multiplier": 1.2

8
enums/__init__.py Normal file
View File

@@ -0,0 +1,8 @@
from .analytics_event_type import AnalyticsEventType
from .cache_ttl import CacheTTL
from .consent_duration import ConsentDuration
from .consent_scope import ConsentScope
from .embed_color import EmbedColor
from .health_status import HealthStatus
from .message_events import MessageEvents
from .punishment import Punishment

View File

@@ -0,0 +1,7 @@
from enum import Enum
class AnalyticsEventType(Enum):
GUILD_MESSAGE_SENT = "guild_message_sent"
GUILD_MESSAGE_EDITED = "guild_message_edited"
GUILD_MESSAGE_DELETED = "guild_message_deleted"

7
enums/cache_ttl.py Normal file
View File

@@ -0,0 +1,7 @@
from enum import Enum
class CacheTTL(Enum):
SHORT = 300
NORMAL = 3600
LONG = 86400

View File

@@ -0,0 +1,7 @@
from enum import Enum
class ConsentDuration(Enum):
SHORT = "short"
NORMAL = "normal"
LONG = "long"

View File

@@ -3,4 +3,4 @@ from enum import Enum
class ConsentScope(Enum):
GENERAL = "general"
MODULE_DEEPL = "module_deepl"
INTEGRATION_DEEPL = "integration_deepl"

9
enums/embed_color.py Normal file
View File

@@ -0,0 +1,9 @@
from enum import Enum
class EmbedColor(Enum):
PRIMARY = "primary"
SECONDARY = "secondary"
SUCCESS = "success"
WARNING = "warning"
ERROR = "error"

8
enums/health_status.py Normal file
View File

@@ -0,0 +1,8 @@
from enum import Enum
class HealthStatus(Enum):
OPERATIONAL = "operational"
DEGRADED = "degraded"
FAILED = "failed"
UNKNOWN = "unknown"

View File

@@ -1,5 +1,30 @@
{
"messages": {
"general": {
"permission_denied": "You're not allowed to perform this operation.",
"permission_denied_title": "Permission denied",
"error_title": "Something went wrong",
"success_title": "Success",
"warning_title": "Warning"
},
"admin": {
"config_reload_success": "Bot's configuration has been reloaded.",
"config_reload_error": "Could not reload the bot's configuration:\n```\n{error}\n```"
},
"data_control": {
"consent_required": "Consent to following scope(s) is required to access the command: {scopes}\n\nYou can use following commands to manage your consents:\n`/consent terms <scope>` - Review the terms for a consent\n`/consent give <scope>` - Give consent for a scope"
},
"wallet": {
"balance_title": "Balance",
"balance_own": "Your balance is `{balance}`.",
"balance_user": "**{user}**'s balance is `{balance}`.",
"transfer_success": "You have transferred `{amount}` to **{recipient}**.",
"transfer_insufficient_funds": "Insufficient funds. `{amount}` more is needed for this transaction."
},
"reload": {
"reload_success": "Configuration has been successfully reloaded.",
"reload_failure": "Could not reload the configuration:\n```\n{exception}\n```"
},
"welcome": {
"morning": [
"{0} Добрий ранок та ласкаво просимо! {1}"
@@ -18,6 +43,23 @@
]
}
},
"data_control": {
"scopes": {
"general": {
"name": "General",
"description": ""
},
"integration_deepl": {
"name": "Integration: DeepL",
"description": ""
}
},
"consent_durations": {
"short": "Short",
"normal": "Normal",
"long": "Long"
}
},
"commands": {
"reload": {
"description": "Reload bot's configuration"

View File

@@ -1,11 +1,30 @@
{
"messages": {
"general": {
"permission_denied": "You're not allowed to perform this operation.",
"permission_denied_title": "Permission denied",
"error_title": "Something went wrong",
"success_title": "Success",
"warning_title": "Warning"
},
"admin": {
"config_reload_success": "Bot's configuration has been reloaded.",
"config_reload_error": "Could not reload the bot's configuration:\n```\n{error}\n```"
},
"data_control": {
"consent_required": "Consent to following scope(s) is required to access the command: {scopes}\n\nYou can use following commands to manage your consents:\n`/consent terms <scope>` - Review the terms for a consent\n`/consent give <scope>` - Give consent for a scope"
},
"wallet": {
"balance_title": "Balance",
"balance_own": "Your balance is `{balance}`.",
"balance_user": "**{user}**'s balance is `{balance}`.",
"transfer_success": "You have transferred `{amount}` to **{recipient}**.",
"transfer_insufficient_funds": "Insufficient funds. `{amount}` more is needed for this transaction."
},
"reload": {
"reload_success": "Configuration has been successfully reloaded.",
"reload_failure": "Could not reload the configuration:\n```\n{exception}\n```"
},
"welcome": {
"morning": [
"{0} Good morning and welcome! {1}"
@@ -24,6 +43,23 @@
]
}
},
"data_control": {
"scopes": {
"general": {
"name": "General",
"description": ""
},
"integration_deepl": {
"name": "Integration: DeepL",
"description": ""
}
},
"consent_durations": {
"short": "Short",
"normal": "Normal",
"long": "Long"
}
},
"commands": {
"reload": {
"description": "Reload bot's configuration"

View File

@@ -1,11 +1,30 @@
{
"messages": {
"general": {
"permission_denied": "Ви не маєте права виконувати цю операцію.",
"permission_denied_title": "Permission denied",
"error_title": "Something went wrong",
"success_title": "Success",
"warning_title": "Warning"
},
"admin": {
"config_reload_success": "Конфігурацію бота було перезавантажено.",
"config_reload_error": "Не вдалося перезавантажити конфігурацію бота:\n```\n{error}\n```"
},
"data_control": {
"consent_required": "Для доступу до команди необхідна згода на наступні сфери застосування: {scopes}\n\nВи можете використовувати наступні команди для управління своїми згодами:\n`/consent terms <scope>` - Переглянути умови згоди\n`/consent give <scope>` - Надати згоду на певну сферу застосування"
},
"wallet": {
"balance_title": "Balance",
"balance_own": "Ваш баланс складає `{balance}`.",
"balance_user": "Баланс **{user}** складає `{balance}`.",
"transfer_success": "Ви перевели `{amount}` на рахунок **{recipient}**.",
"transfer_insufficient_funds": "Недостатньо коштів. Потрібно ще `{amount}` для цієї транзакції."
},
"reload": {
"reload_success": "Конфігурацію було успішно перезавантажено.",
"reload_failure": "Не вдалось перезавантажити конфігурацію:\n```\n{exception}\n```"
},
"welcome": {
"morning": [
"{0} Добрий ранок та ласкаво просимо! {1}",
@@ -38,6 +57,23 @@
]
}
},
"data_control": {
"scopes": {
"general": {
"name": "Загальне",
"description": ""
},
"integration_deepl": {
"name": "Інтеграція: DeepL",
"description": ""
}
},
"consent_durations": {
"short": "Короткий",
"normal": "Звичаний",
"long": "Довгий"
}
},
"commands": {
"reload": {
"description": "Перезавантажити конфігурацію бота"

View File

@@ -13,7 +13,6 @@ from libbot.utils import config_get
# Import required for uvicorn
from api.app import app # noqa
from classes.pycord_bot import PycordBot
from modules.extensions_loader import dynamic_import_from_src
from modules.scheduler import scheduler
from modules.utils import get_logger, get_logging_config
@@ -32,12 +31,11 @@ with contextlib.suppress(ImportError):
async def main():
bot = PycordBot(scheduler=scheduler)
bot: PycordBot = PycordBot(scheduler=scheduler)
bot.load_extension("cogs")
# Import API modules
dynamic_import_from_src("api/extensions", star_import=True)
app.set_bot(bot)
try:
await bot.start(config_get("bot_token", "bot"))

1
modules/__init__.py Normal file
View File

@@ -0,0 +1 @@
from . import utils, database, migrator, scheduler

View File

@@ -23,13 +23,14 @@ else:
)
# Async declarations
db_client = AsyncMongoClient(con_string)
db_client = AsyncMongoClient(con_string, connectTimeoutMS=3000)
db: AsyncDatabase = db_client.get_database(name=db_config["name"])
col_users: AsyncCollection = db.get_collection("users")
col_guilds: AsyncCollection = db.get_collection("guilds")
col_wallets: AsyncCollection = db.get_collection("wallets")
col_consents: AsyncCollection = db.get_collection("consents")
col_analytics: AsyncCollection = db.get_collection("analytics")
col_custom_channels: AsyncCollection = db.get_collection("custom_channels")
@@ -49,7 +50,7 @@ async def _update_database_indexes() -> None:
["owner_id", "guild_id"], name="owner_id-guild_id", unique=True
)
await col_consents.create_index(
["owner_id", "guild_id", "scope"], name="owner_id-guild_id-scope", unique=True
["owner_id", "guild_id", "scope"], name="owner_id-guild_id-scope", unique=False
)
await col_custom_channels.create_index(
["owner_id", "guild_id", "channel_id"], name="owner_id-guild_id-channel_id", unique=True

View File

@@ -1,74 +0,0 @@
import logging
from importlib.util import module_from_spec, spec_from_file_location
from os import getcwd, walk
from pathlib import Path
from types import ModuleType
from typing import List
logger = logging.getLogger(__name__)
# Import functions
# Took from https://stackoverflow.com/a/57892961
def get_py_files(src: str | Path) -> List[str]:
cwd = getcwd() # Current Working directory
py_files = []
for root, dirs, files in walk(src):
py_files.extend(Path(f"{cwd}/{root}/{file}") for file in files if file.endswith(".py"))
return py_files
def dynamic_import(module_name: str, py_path: str) -> ModuleType | None:
try:
module_spec = spec_from_file_location(module_name, py_path)
if module_spec is None:
raise RuntimeError(
f"Module spec from module name {module_name} and path {py_path} is None"
)
module = module_from_spec(module_spec)
if module_spec.loader is None:
logger.warning(
"Could not load extension %s due to spec loader being None.",
module_name,
)
return None
module_spec.loader.exec_module(module)
return module
except SyntaxError:
logger.warning(
"Could not load extension %s due to invalid syntax. Check logs/errors.log for details.",
module_name,
)
return None
except Exception as exc:
logger.warning("Could not load extension %s due to %s", module_name, exc)
return None
def dynamic_import_from_src(src: str | Path, star_import=False) -> None:
my_py_files = get_py_files(src)
for py_file in my_py_files:
module_name = Path(py_file).stem
logger.debug("Importing %s extension...", module_name)
imported_module = dynamic_import(module_name, py_file)
if imported_module is not None:
if star_import:
for obj in dir(imported_module):
globals()[obj] = imported_module.__dict__[obj]
else:
globals()[module_name] = imported_module
logger.info("Successfully loaded %s extension", module_name)
return

View File

@@ -0,0 +1 @@
from .data_control import user_consent_required

View File

@@ -0,0 +1,35 @@
from logging import Logger
from typing import Callable, List
from discord import ApplicationContext
from discord.ext import commands
from discord.ext.commands import Context, CheckFailure
from discord.ext.commands.context import T
from classes import PycordUser
from enums import ConsentScope
from modules.utils import get_logger
logger: Logger = get_logger(__name__)
def user_consent_required(*required_scopes: ConsentScope) -> Callable[[T], T]:
async def predicate(ctx: Context | ApplicationContext) -> True:
user: PycordUser = await ctx.bot.find_user(ctx.author.id, ctx.guild.id)
for required_scope in required_scopes:
if not await user.has_active_consent(required_scope):
localized_scopes: List[str] = [
ctx.bot._("name", "data_control", "scopes", scope.value, locale=ctx.locale)
for scope in required_scopes
]
raise CheckFailure(
ctx.bot._(
"consent_required", "messages", "data_control", locale=ctx.locale
).format(scopes=f"`{'`, `'.join(localized_scopes)}`")
)
return True
return commands.check(predicate)

View File

@@ -1,2 +1,3 @@
from .cache_utils import restore_from_cache
from .logging_utils import get_logger, get_logging_config
from .router_loader import run_router_setups

View File

@@ -0,0 +1,19 @@
import importlib
from pkgutil import iter_modules
from types import ModuleType
from fastapi import FastAPI
def run_router_setups(app: FastAPI, package: str) -> None:
package_path: str = package.replace(".", "/")
for _, name, is_package in iter_modules([package_path]):
full_module: str = f"{package}.{name}"
module: ModuleType = importlib.import_module(full_module)
if hasattr(module, "setup"):
module.setup(app)
if is_package:
run_router_setups(app, full_module)

View File

@@ -2,14 +2,14 @@ import logging
from datetime import datetime
from typing import Any, Dict, List
from classes.pycordbot import PycordBot
from database import col_messages
from discord import Embed
from pymongo import DESCENDING
from pytz import timezone
from ujson import loads
from classes.enums import MessageEvents
from classes.pycordbot import PycordBot
from enums import MessageEvents
from modules.utils import hex_to_int
from modules.weather.parser import parse_weather

View File

@@ -1,6 +1,35 @@
[project]
name = "Javelina"
version = "0.0.1"
dynamic = ["dependencies", "optional-dependencies"]
description = "Discord bot that manages the server and provides an additional RESTful API"
readme = "README.md"
license = "AGPL-3.0"
license-files = ["LICENSE"]
requires-python = ">=3.11"
[tool.setuptools.dynamic]
dependencies = { file = "requirements.txt" }
[tool.setuptools.dynamic.optional-dependencies]
dev = { file = "requirements/dev.txt" }
[tool.black]
line-length = 96
target-version = ["py311"]
target-version = ["py311", "py312", "py313"]
[tool.isort]
profile = "black"
profile = "black"
[tool.mypy]
namespace_packages = true
install_types = true
strict = true
show_error_codes = true
[tool.pylint]
disable = ["line-too-long"]
[tool.pylint.main]
extension-pkg-whitelist = ["ujson"]
py-version = 3.11

View File

@@ -1,11 +1,12 @@
aiohttp>=3.6.0
apscheduler~=3.11.0
fastapi[all]~=0.115.0
fastapi[all]==0.115.5
fastapi_discord==0.2.7
libbot[speed,pycord,cache]==4.2.0
libbot[speed,pycord,cache]==4.4.0
mongodb-migrations==1.3.1
pynacl~=1.5.0
pytz~=2025.1
tempora~=5.8.1
# Temporarily disabled because
# these are still unused for now

4
requirements/dev.txt Normal file
View File

@@ -0,0 +1,4 @@
black==25.1.0
isort==5.13.2
mypy==1.16.1
pylint==3.3.7