59 Commits

Author SHA1 Message Date
5acddf3e94 Update dependency typer to ~=0.17.1
All checks were successful
Test / build (pull_request) Successful in 32s
Test / build (push) Successful in 26s
2025-08-30 14:57:29 +03:00
1153a9441f Fixed formatting with isort and black
All checks were successful
Test / build (push) Successful in 26s
2025-08-03 01:51:22 +02:00
63c815e748 Added a basic implementation of scheduled actions (will be used for #9 in the future) 2025-08-03 01:50:23 +02:00
8af1cfd689 Added very basic implementation of /clear
All checks were successful
Test / build (push) Successful in 27s
2025-08-02 01:42:09 +02:00
2936705be0 Fixed wrong method's name in cog_admin
All checks were successful
Test / build (push) Successful in 55s
2025-08-01 23:52:16 +02:00
8d10901467 Update python Docker tag to v3.13
All checks were successful
Test / build (pull_request) Successful in 36s
Test / build (push) Successful in 35s
2025-07-29 01:24:03 +03:00
6ef1e4be38 Added a small Dockerfile
All checks were successful
Test / build (push) Successful in 27s
2025-07-28 23:46:27 +02:00
5b5d6a9d88 Improved Makefile and removed run from it, updated uv.lock
All checks were successful
Test / build (push) Successful in 27s
2025-07-28 12:55:52 +02:00
47f770528a Merge pull request 'revert f30617b943e8ecad0f9e96108c54b8b3ce0534a1' (#67) from renovate/fastapi-0.x into dev
All checks were successful
Test / build (push) Successful in 22s
Reviewed-on: #67
2025-07-28 08:13:06 +03:00
57e0a0e085 revert f30617b943
All checks were successful
Test / build (pull_request) Successful in 25s
revert Update dependency fastapi to ~=0.116.1,<0.117.0
2025-07-28 08:12:18 +03:00
f30617b943 Update dependency fastapi to ~=0.116.1,<0.117.0
All checks were successful
Test / build (pull_request) Successful in 26s
Test / build (push) Successful in 25s
2025-07-28 07:54:22 +03:00
88f5921ea0 Removed useless migration script, fixed a call in main and added the uv.lock file to Git
All checks were successful
Test / build (push) Successful in 25s
2025-07-28 06:50:39 +02:00
12beb78131 Fixed broken routers
All checks were successful
Test / build (push) Successful in 26s
2025-07-28 06:33:58 +02:00
8f8b76df2c Improved CLI with Typer and fixed broken guild commands
All checks were successful
Test / build (push) Successful in 27s
2025-07-28 06:23:00 +02:00
7e3bb55bab Update dependency fastapi to v0.116.1
All checks were successful
Test / build (pull_request) Successful in 27s
Test / build (push) Successful in 25s
2025-07-28 02:46:19 +03:00
48f24c3a6b Moved project structure to javelina/
All checks were successful
Test / build (push) Successful in 27s
2025-07-28 01:32:38 +02:00
d804d6eb75 Merge pull request 'Added basic implementation for #51 and #62' (#64) from feature/profitroll/data-control into dev
All checks were successful
Test / build (push) Successful in 26s
Reviewed-on: #64
2025-07-28 02:13:46 +03:00
4cdb8fbd26 WIP: Implemented /consent give all, /consent withdraw all and /consent review (#51) 2025-07-28 01:12:35 +02:00
6f38ecb33d Added Makefile and development dependencies 2025-07-28 00:37:31 +02:00
cec35f10d7 Merge pull request 'Added basic analytics collector (#62)' (#63) from feature/profitroll/analytics into feature/profitroll/data-control
Reviewed-on: #63
2025-07-28 01:26:11 +03:00
7ef4372730 Added basic analytics collector (#62) 2025-07-28 00:23:36 +02:00
337c86d35f Fixes CVE-2024-23334
All checks were successful
Test / build (push) Successful in 26s
2025-07-25 12:31:05 +03:00
fe7d11092c Added safety scan with Safety
All checks were successful
Test / build (push) Successful in 52s
2025-07-25 12:28:13 +03:00
352f8c97ec WIP: Added consent durations and modified default embed colors (#51) 2025-07-24 19:52:12 +02:00
558b12bdbd WIP: Added basic implementation of consent withdrawal (#51) 2025-07-24 02:06:50 +02:00
6279bc4952 WIP: Added scope names to the localization and added a basic implementation of colors for data control (#51) 2025-07-24 00:40:08 +02:00
f61fa886d1 WIP: Added middleware for data control and changed the database index for consents (#51) 2025-07-23 21:55:43 +02:00
378473e453 WIP: Added scopes for commands and renamed consent scope "module_deepl" to "integration_deepl" 2025-07-22 01:31:58 +02:00
e0b2575d32 Added a minimal implementation for consent giving 2025-07-21 23:20:29 +02:00
0c2467209d Merge branch 'dev' into feature/profitroll/data-control
# Conflicts:
#	classes/__init__.py
#	classes/pycord_user.py
#	cogs/cog_admin.py
#	enums/__init__.py
#	enums/consent_scope.py
2025-07-21 22:43:55 +02:00
4be95428b5 Improved i18n in cog_admin, removed old wallet cog and added TODOs for consent durations 2025-07-21 22:38:32 +02:00
de2b04ca12 Made Wallet a child of BaseCacheable and improved caching 2025-07-21 00:24:06 +02:00
037e493bcc Implemented cache TTL 2025-07-21 00:23:06 +02:00
89307d8d0c Moved enums from classes/enums to enums 2025-07-21 00:22:31 +02:00
7565a643aa Merge pull request 'Update dependency libbot to v4.4.0' (#60) from renovate/libbot-4.x into dev
Reviewed-on: #60
2025-07-09 16:00:39 +03:00
d4474421e5 Update dependency libbot to v4.4.0 2025-07-09 16:00:19 +03:00
2684d9358e Merge pull request 'Update dependency libbot to v4.3.0' (#59) from renovate/libbot-4.x into dev
Reviewed-on: #59
2025-07-08 15:45:38 +03:00
a37827761b Update dependency libbot to v4.3.0 2025-07-08 02:41:39 +03:00
a553124e33 Merge pull request 'Update dependency fastapi to ~=0.116.0' (#58) from renovate/fastapi-0.x into dev
Reviewed-on: #58
2025-07-08 00:49:49 +03:00
cded34cb8a Update dependency fastapi to ~=0.116.0 2025-07-07 18:28:00 +03:00
71730362ef Closes #55 2025-06-07 21:06:16 +02:00
46edf5ea14 Removed old wallet cog and replaced "client" with "bot" in the new one 2025-06-07 00:43:47 +02:00
4ab7fb0630 WIP: Added config module and slash command stubs 2025-06-07 00:41:42 +02:00
9e10cf4fa4 Merge pull request 'Added stubs for Data and Consent cogs' (#54) from feature/data-control into feature/profitroll/data-control
Reviewed-on: #54
2025-06-07 00:21:27 +03:00
7b15480c30 WIP: Added stubs for Data and Consent cogs 2025-06-06 23:17:55 +02:00
996fe387df Improved health check and monitoring 2025-06-05 11:22:22 +02:00
fed2e0df07 Fixed database connection timeout not being handled during healthcheck 2025-06-04 09:58:01 +02:00
a109566738 Added missing FastAPI class 2025-06-04 09:42:39 +02:00
cbdfee63e4 WIP: Simple health check 2025-06-04 01:59:54 +02:00
54bfef981d WIP: Implemented basic methods for Consent and added necessary methods in PycordUser (#51) 2025-06-01 16:00:40 +02:00
1d8c29e73f Closes #12 2025-06-01 15:28:41 +02:00
4b4b9f5b0d WIP: Added stubs for #51 2025-06-01 01:06:26 +02:00
d08ea6240e Added simple Discord auth 2025-05-26 09:17:23 +02:00
ce86b95163 Slightly improved API extensions 2025-05-26 01:47:49 +02:00
296ef50a53 Removed "en" because it was already replaced by "en-US" 2025-05-25 22:57:51 +02:00
d5dc438601 WIP: #44, #43, #13, #12 2025-05-25 22:32:36 +02:00
62ee26b20f Replaced async_pymongo with default pymongo's async calls, fixed indexes 2025-05-19 00:16:51 +02:00
27ab68f6c5 Allowed empty wallet creation during transfers 2025-05-19 00:16:17 +02:00
32f19ee16b Added support for cache prefixes, improved logging and cached objects 2025-05-19 00:15:41 +02:00
91 changed files with 5154 additions and 709 deletions

32
.gitea/workflows/test.yml Normal file
View File

@@ -0,0 +1,32 @@
name: Test
on:
push:
branches:
- main
- staging
- dev
tags-ignore:
- v*
pull_request:
branches:
- main
- staging
- dev
jobs:
build:
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Run safety check
uses: pyupio/safety-action@v1
with:
api-key: ${{ secrets.SAFETY_API_KEY }}

14
Dockerfile Normal file
View File

@@ -0,0 +1,14 @@
# WARNING: Docker deployment is not officially supported (yet)
FROM python:3.13-slim
LABEL authors="Javelina"
COPY . /app/
WORKDIR /app
RUN pip install -r requirements.txt
EXPOSE 8000/tcp
CMD ["uvicorn", "main:app"]

29
Makefile Normal file
View File

@@ -0,0 +1,29 @@
.PHONY: setup setup-uv update update-uv dev-setup dev-setup-uv dev-update dev-update-uv
setup:
python3 -m venv .venv
.venv/bin/pip install -r requirements.txt
setup-uv:
uv venv
uv sync
update:
.venv/bin/pip install -r requirements.txt
update-uv:
uv sync
dev-setup:
python3 -m venv .venv
.venv/bin/pip install -r requirements.txt -r requirements/dev.txt
dev-setup-uv:
uv venv
uv sync --extra dev
dev-update:
.venv/bin/pip install -r requirements.txt -r requirements/dev.txt
dev-update-uv:
uv sync --extra dev

View File

@@ -1,7 +0,0 @@
from fastapi import FastAPI
# TODO Add an integration for the contact information
app = FastAPI(
title="Javelina",
version="0.0.1",
)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 4.2 KiB

View File

@@ -1,40 +0,0 @@
import logging
from logging import Logger
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from api.app import app
from classes import PycordGuild
from classes.errors import WalletNotFoundError, GuildNotFoundError
from classes.wallet import Wallet
logger: Logger = logging.getLogger(__name__)
@app.get("/v1/guilds/{guild_id}", response_class=JSONResponse)
async def get_guild_wallet(guild_id: int):
try:
guild: PycordGuild = await PycordGuild.from_id(guild_id, allow_creation=False)
except GuildNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Guild not found"
) from exc
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
) from exc
return guild.to_dict(json_compatible=True)
@app.get("/v1/guilds/{guild_id}/wallets/{user_id}", response_class=JSONResponse)
async def get_guild_wallet(guild_id: int, user_id: int):
try:
wallet: Wallet = await Wallet.from_id(user_id, guild_id, allow_creation=False)
except WalletNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Wallet not found"
) from exc
return wallet.to_dict(json_compatible=True)

View File

@@ -1,23 +0,0 @@
import logging
from logging import Logger
from fastapi import HTTPException, status
from fastapi.responses import JSONResponse
from api.app import app
from classes import PycordUser
from classes.errors import UserNotFoundError
logger: Logger = logging.getLogger(__name__)
@app.get("/v1/users/{user_id}", response_class=JSONResponse)
async def get_user(user_id: int):
try:
user: PycordUser = await PycordUser.from_id(user_id, allow_creation=False)
except UserNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
) from exc
return user.to_dict(json_compatible=True)

View File

@@ -1,14 +0,0 @@
import logging
from logging import Logger
from pathlib import Path
from fastapi.responses import FileResponse
from api.app import app
logger: Logger = logging.getLogger(__name__)
@app.get("/favicon.ico", response_class=FileResponse, include_in_schema=False)
async def favicon():
return FileResponse(Path("api/assets/favicon.ico"))

View File

@@ -1,5 +0,0 @@
from .pycord_guild import PycordGuild
from .pycord_guild_colors import PycordGuildColors
from .pycord_user import PycordUser
# from .wallet import Wallet

View File

@@ -1,15 +0,0 @@
from dataclasses import dataclass
from datetime import datetime
from bson import ObjectId
@dataclass
class CustomChannel:
_id: ObjectId
channel_id: int
owner_id: int
allow_comments: bool
allow_reactions: bool
created: datetime
deleted: datetime | None

View File

@@ -1 +0,0 @@
from .message_events import MessageEvents

View File

@@ -1,81 +0,0 @@
import logging
from logging import Logger
from typing import Any
from aiohttp import ClientSession
from discord import User
from libbot.cache.manager import create_cache_client
from libbot.pycord.classes import PycordBot as LibPycordBot
from classes import PycordUser
logger: Logger = logging.getLogger(__name__)
# from modules.tracking.dhl import update_tracks_dhl
class PycordBot(LibPycordBot):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._set_cache_engine()
self.client_session = ClientSession()
if self.scheduler is None:
return
# This replacement exists because of the different
# i18n formats than provided by libbot
self._ = self._modified_string_getter
# Scheduler job for DHL parcel tracking
# self.scheduler.add_job(
# update_tracks_dhl,
# trigger="cron",
# hour=self.config["modules"]["tracking"]["fetch_hours"],
# args=[self, self.client_session],
# )
def _modified_string_getter(self, key: str, *args: str, locale: str | None = None) -> Any:
"""This method exists because of the different i18n formats than provided by libbot.
It splits "-" and takes the first part of the provided locale to make complex language codes
compatible with an easy libbot approach to i18n.
"""
return self.bot_locale._(
key, *args, locale=None if locale is None else locale.split("-")[0]
)
def _set_cache_engine(self) -> None:
if "cache" in self.config and self.config["cache"]["type"] is not None:
self.cache = create_cache_client(self.config, self.config["cache"]["type"])
async def find_user(self, user: int | User) -> PycordUser:
"""Find User by its ID or User object.
Args:
user (int | User): ID or User object to extract ID from
Returns:
PycordUser: User object
Raises:
UserNotFoundException: User was not found and creation was not allowed
"""
return (
await PycordUser.from_id(user, cache=self.cache)
if isinstance(user, int)
else await PycordUser.from_id(user.id, cache=self.cache)
)
async def start(self, *args: Any, **kwargs: Any) -> None:
await super().start(*args, **kwargs)
async def close(self, **kwargs) -> None:
await self.client_session.close()
if self.scheduler is not None:
self.scheduler.shutdown()
await super().close(**kwargs)

View File

@@ -1,47 +0,0 @@
from dataclasses import dataclass
from typing import Dict, Any, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
@dataclass
class PycordGuild:
_id: ObjectId
id: int
def __init__(self) -> None:
raise NotImplementedError()
@classmethod
async def from_id(
cls, guild_id: int, allow_creation: bool = True, cache: Optional[Cache] = None
) -> "PycordGuild":
"""Find guild in database and create new record if guild does not exist.
Args:
guild_id (int): User's Discord ID
allow_creation (:obj:`bool`, optional): Create new guild record if none found in the database
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
PycordGuild: User object
Raises:
GuildNotFoundError: User was not found and creation was not allowed
"""
raise NotImplementedError()
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordGuild object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordGuild
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
}

View File

@@ -1,172 +0,0 @@
import logging
from dataclasses import dataclass
from logging import Logger
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from classes.errors.pycord_user import UserNotFoundError
from classes.wallet import Wallet
from modules.database import col_users
logger: Logger = logging.getLogger(__name__)
@dataclass
class PycordUser:
"""Dataclass of DB entry of a user"""
__slots__ = ("_id", "id")
_id: ObjectId
id: int
@classmethod
async def from_id(
cls, user_id: int, allow_creation: bool = True, cache: Optional[Cache] = None
) -> "PycordUser":
"""Find user in database and create new record if user does not exist.
Args:
user_id (int): User's Discord ID
allow_creation (:obj:`bool`, optional): Create new user record if none found in the database
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
PycordUser: User object
Raises:
UserNotFoundError: User was not found and creation was not allowed
"""
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user_id}")
if cached_entry is not None:
return cls(**cached_entry)
db_entry = await col_users.find_one({"id": user_id})
if db_entry is None:
if not allow_creation:
raise UserNotFoundError(user_id)
db_entry = PycordUser.get_defaults(user_id)
insert_result: InsertOneResult = await col_users.insert_one(db_entry)
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(f"user_{user_id}", db_entry)
return cls(**db_entry)
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordUser object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordUser
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
}
async def _set(self, key: str, value: Any, cache: Optional[Cache] = None) -> None:
"""Set attribute data and save it into the database.
Args:
key (str): Attribute to change
value (Any): Value to set
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await col_users.update_one({"_id": self._id}, {"$set": {key: value}}, upsert=True)
self._update_cache(cache)
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value)
async def _remove(self, key: str, cache: Optional[Cache] = None) -> None:
"""Remove attribute data and save it into the database.
Args:
key (str): Attribute to remove
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = PycordUser.get_default_value(key)
setattr(self, key, default_value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: default_value}}, upsert=True
)
self._update_cache(cache)
logger.info("Removed attribute '%s' of user %s", key, self.id)
def _get_cache_key(self) -> str:
return f"user_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self.to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
@staticmethod
def get_defaults(user_id: Optional[int] = None) -> Dict[str, Any]:
return {
"id": user_id,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in PycordUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in PycordUser")
return PycordUser.get_defaults()[key]
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove user data from database. Currently only removes the user record from users collection.
Args:
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
await col_users.delete_one({"_id": self._id})
self._delete_cache(cache)
async def get_wallet(self, guild_id: int) -> Wallet:
"""Get wallet of the user.
Args:
guild_id (int): Guild ID of the wallet
Returns:
Wallet: Wallet object of the user
"""
return await Wallet.from_id(self.id, guild_id)

50
cli.py
View File

@@ -1,26 +1,40 @@
import logging
from argparse import ArgumentParser
from logging import Logger
from pathlib import Path
from shutil import copyfile
from modules.migrator import migrate_database
from typer import Option, Typer, echo
logger: Logger = logging.getLogger(__name__)
from javelina.modules.migrator import migrate_database
parser = ArgumentParser(
prog="Javelina",
description="Discord bot for community management.",
)
parser.add_argument("--migrate", action="store_true")
parser.add_argument("--only-api", action="store_true")
args = parser.parse_args()
cli: Typer = Typer()
def main():
if args.migrate:
logger.info("Performing migrations...")
migrate_database()
@cli.command()
def init(
destination: Path = Option(
"config.json", help="File to write the default configuration to"
),
overwrite: bool = Option(False, help="Overwrite config if already exists"),
) -> None:
example_path: Path = Path("config_example.json")
if destination.exists() and not overwrite:
raise FileExistsError(
f"File at {destination} already exists. Pass --overwrite to overwrite it"
)
copyfile(example_path, destination)
echo(f"Copied default config to {destination}")
@cli.command()
def migrate() -> None:
echo("Performing migrations...")
migrate_database()
def main() -> None:
cli()
if __name__ == "__main__":

View File

@@ -1,73 +0,0 @@
import logging
from logging import Logger
from discord import ApplicationContext, SlashCommandGroup, option, User
from discord.ext import commands
from classes.errors import WalletInsufficientFunds
from classes.pycord_bot import PycordBot
from classes.wallet import Wallet
logger: Logger = logging.getLogger(__name__)
class WalletCog(commands.Cog):
def __init__(self, client: PycordBot):
self.client: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup("wallet", "Wallet management")
@command_group.command(
name="balance",
description="View wallet's balance",
)
@option("user", description="User whose balance to check (if not your own)", required=False)
async def command_wallet_balance(self, ctx: ApplicationContext, user: User = None) -> None:
wallet: Wallet = await Wallet.from_id(
ctx.user.id if not user else user.id, ctx.guild_id
)
await ctx.respond(
self.client._("balance_own", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance
)
if user is None
else self.client._("balance_user", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance, user=user.display_name
)
)
@command_group.command(
name="transfer",
description="View wallet's balance",
)
@option("user", description="Recipient")
@option("amount", description="Amount", min_value=0.01)
async def command_wallet_transfer(
self, ctx: ApplicationContext, user: User, amount: float
) -> None:
amount = round(amount, 2)
# Guild will be needed for overdraft options
# guild: PycordGuild = await PycordGuild.from_id(ctx.guild_id)
wallet: Wallet = await Wallet.from_id(ctx.user.id, ctx.guild_id)
try:
await wallet.transfer(user.id, ctx.guild_id, amount)
except WalletInsufficientFunds:
await ctx.respond(
self.client._(
"transfer_insufficient_funds", "messages", "wallet", locale=ctx.locale
).format(amount=round(abs(wallet.balance - amount), 2))
)
return
await ctx.respond(
self.client._("transfer_success", "messages", "wallet", locale=ctx.locale).format(
amount=amount, recipient=user.display_name
)
)
def setup(client: PycordBot) -> None:
client.add_cog(WalletCog(client))

View File

@@ -1,5 +1,5 @@
{
"locale": "en",
"locale": "en-US",
"debug": false,
"bot": {
"owners": [
@@ -16,6 +16,13 @@
"activity_text": "The Game Of Life"
}
},
"api": {
"oauth": {
"client_id": null,
"client_secret": null
},
"public_url": "http://127.0.0.1:8000"
},
"database": {
"user": null,
"password": null,
@@ -26,16 +33,54 @@
"cache": {
"type": null,
"memcached": {
"uri": "127.0.0.1:11211"
"uri": "127.0.0.1:11211",
"prefix": null
},
"redis": {
"uri": "redis://127.0.0.1:6379/0"
"uri": "redis://127.0.0.1:6379/0",
"prefix": null
}
},
"privacy": {
"api_endpoint": "https://api.javelina.eu/v1"
},
"colors": {
"primary": "#A4A4A6",
"secondary": "#595351",
"success": "#57F287",
"warning": "#FEE75C",
"error": "#ED4245"
},
"modules": {
"consent": {
"enabled": true,
"scopes": {
"general": {
"name": "General",
"is_third_party": false,
"terms_url": "https://www.javelina.eu/terms-of-service",
"privacy_url": "https://www.javelina.eu/privacy-policy"
},
"integration_deepl": {
"name": "DeepL",
"is_third_party": true,
"terms_url": "https://www.deepl.com/en/terms-of-use",
"privacy_url": "https://www.deepl.com/en/privacy"
}
},
"durations": {
"first_party": {
"short": "1 month",
"normal": "12 months",
"long": "24 months"
},
"third_party": {
"short": "1 month",
"normal": "6 months",
"long": "12 months"
}
}
},
"leveling": {
"addition": 50,
"multiplier": 1.2

54
javelina/api/app.py Normal file
View File

@@ -0,0 +1,54 @@
from logging import Logger
from urllib.parse import urljoin
from fastapi_discord import DiscordOAuthClient, RateLimited, Unauthorized
from fastapi_discord.exceptions import ClientSessionNotInitialized
from libbot.utils import config_get
from starlette.responses import JSONResponse
from javelina.classes.fastapi import FastAPI
from javelina.modules.utils import get_logger
from javelina.modules.utils.router_loader import run_router_setups
logger: Logger = get_logger(__name__)
discord_oauth: DiscordOAuthClient = DiscordOAuthClient(
config_get("client_id", "api", "oauth"),
config_get("client_secret", "api", "oauth"),
urljoin(config_get("public_url", "api"), "/v1/callback"),
("identify", "guilds"),
)
# TODO Add an integration for the contact information
app: FastAPI = FastAPI(
title="Javelina",
version="0.0.1",
)
run_router_setups(app, "javelina.api.routers")
# TODO Replace this with a FastAPI lifespan
@app.on_event("startup")
async def on_startup():
await discord_oauth.init()
@app.exception_handler(Unauthorized)
async def unauthorized_error_handler(_, __) -> JSONResponse:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
@app.exception_handler(RateLimited)
async def rate_limit_error_handler(_, exc: RateLimited) -> JSONResponse:
return JSONResponse(
{"error": "RateLimited", "retry": exc.retry_after, "message": exc.message},
status_code=429,
)
@app.exception_handler(ClientSessionNotInitialized)
async def client_session_error_handler(_, exc: ClientSessionNotInitialized) -> JSONResponse:
logger.error("Client session was not initialized: %s", exc, exc_info=exc)
return JSONResponse({"error": "Internal Error"}, status_code=500)

View File

@@ -0,0 +1 @@
from . import admin, auth, health, user

View File

@@ -0,0 +1 @@
from . import guilds, users, wallets

View File

@@ -0,0 +1,42 @@
from logging import Logger
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.requests import Request
from javelina.classes import PycordGuild
from javelina.classes.errors import GuildNotFoundError
from javelina.classes.fastapi import FastAPI
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1/admin/guilds", tags=["Admin - Guilds"])
# TODO Implement this method
@router_v1.get("/", response_class=JSONResponse)
async def get_guilds_v1() -> JSONResponse:
raise HTTPException(status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented")
@router_v1.get("/{guild_id}", response_class=JSONResponse)
async def get_guild_v1(request: Request, guild_id: int) -> JSONResponse:
try:
guild: PycordGuild = await PycordGuild.from_id(
guild_id, allow_creation=False, cache=request.app.bot.cache
)
except GuildNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Guild not found"
) from exc
except NotImplementedError as exc:
raise HTTPException(
status_code=status.HTTP_501_NOT_IMPLEMENTED, detail="Not implemented"
) from exc
return JSONResponse(guild.to_dict(json_compatible=True))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,34 @@
from logging import Logger
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.requests import Request
from javelina.classes import PycordUser
from javelina.classes.errors import UserNotFoundError
from javelina.classes.fastapi import FastAPI
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(
prefix="/v1/admin/guilds/{guild_id}/users", tags=["Admin - Users"]
)
@router_v1.get("/{user_id}", response_class=JSONResponse)
async def get_guild_user_v1(request: Request, guild_id: int, user_id: int) -> JSONResponse:
try:
user: PycordUser = await PycordUser.from_id(
user_id, guild_id, allow_creation=False, cache=request.app.bot.cache
)
except UserNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="User not found"
) from exc
return JSONResponse(user.to_dict(json_compatible=True))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,34 @@
from logging import Logger
from fastapi import APIRouter, HTTPException, status
from fastapi.responses import JSONResponse
from starlette.requests import Request
from javelina.classes.errors import WalletNotFoundError
from javelina.classes.fastapi import FastAPI
from javelina.classes.wallet import Wallet
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(
prefix="/v1/admin/guilds/{guild_id}/wallets", tags=["Admin - Wallets"]
)
@router_v1.get("/{user_id}", response_class=JSONResponse)
async def get_guild_wallet_v1(request: Request, guild_id: int, user_id: int) -> JSONResponse:
try:
wallet: Wallet = await Wallet.from_id(
user_id, guild_id, allow_creation=False, cache=request.app.bot.cache
)
except WalletNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Wallet not found"
) from exc
return JSONResponse(wallet.to_dict(json_compatible=True))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,27 @@
from logging import Logger
from fastapi import APIRouter
from starlette.responses import JSONResponse
from javelina.api.app import discord_oauth
from javelina.classes.fastapi import FastAPI
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1", tags=["Auth"])
@router_v1.get("/login", response_class=JSONResponse)
async def login() -> JSONResponse:
return JSONResponse({"url": discord_oauth.oauth_login_url})
@router_v1.get("/callback", response_class=JSONResponse)
async def callback(code: str) -> JSONResponse:
token, refresh_token = await discord_oauth.get_access_token(code)
return JSONResponse({"access_token": token, "refresh_token": refresh_token})
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,37 @@
from datetime import datetime, timedelta
from logging import Logger
from typing import Optional
from zoneinfo import ZoneInfo
from fastapi import APIRouter, Request
from starlette.responses import JSONResponse
from javelina.classes import ApplicationHealth
from javelina.classes.fastapi import FastAPI
from javelina.modules.database import db
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1", tags=["User"])
@router_v1.get("/status", response_class=JSONResponse)
async def get_status_v1() -> JSONResponse:
return JSONResponse({"status": "ok"})
@router_v1.get("/health", response_class=JSONResponse)
async def get_health_v1(request: Request, detailed: Optional[bool] = False) -> JSONResponse:
if request.app.status is None or request.app.status.get_last_update() < (
datetime.now(tz=ZoneInfo("UTC")) - timedelta(seconds=30)
):
request.app.update_status(await ApplicationHealth.from_data(request.app, db))
health: ApplicationHealth = request.app.status
return JSONResponse(health.to_json(detailed=detailed if detailed is not None else False))
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,27 @@
from logging import Logger
from fastapi import APIRouter, Depends
from fastapi_discord import User
from starlette.responses import JSONResponse
from javelina.api.app import discord_oauth
from javelina.classes.fastapi import FastAPI
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
router_v1: APIRouter = APIRouter(prefix="/v1", tags=["User"])
@router_v1.get(
"/me",
dependencies=[Depends(discord_oauth.requires_authorization)],
response_model=User,
response_class=JSONResponse,
)
async def get_me_v1(user: User = Depends(discord_oauth.user)) -> User:
return user
def setup(app: FastAPI) -> None:
app.include_router(router_v1)

View File

@@ -0,0 +1,10 @@
from .application_health import ApplicationHealth
from .consent import Consent
from .guild_rules import GuildRules
from .pycord_guild import PycordGuild
from .pycord_user import PycordUser
from .scheduled_action import ScheduledAction
from .service_status import ServiceStatus
# from .pycord_guild_colors import PycordGuildColors
# from .wallet import Wallet

View File

@@ -0,0 +1 @@
from .cacheable import Cacheable

View File

@@ -0,0 +1,81 @@
from abc import ABC, abstractmethod
from typing import Any, ClassVar, Dict, Optional
from libbot.cache.classes import Cache
from pymongo.asynchronous.collection import AsyncCollection
class Cacheable(ABC):
"""Abstract class for cacheable"""
__short_name__: str
__collection__: ClassVar[AsyncCollection]
@classmethod
@abstractmethod
async def from_id(cls, *args: Any, cache: Optional[Cache] = None, **kwargs: Any) -> Any:
pass
@abstractmethod
async def _set(self, cache: Optional[Cache] = None, **kwargs: Any) -> None:
pass
@abstractmethod
async def _remove(self, *args: str, cache: Optional[Cache] = None) -> None:
pass
@abstractmethod
def _get_cache_key(self) -> str:
pass
@abstractmethod
def _update_cache(self, cache: Optional[Cache] = None) -> None:
pass
@abstractmethod
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
pass
@staticmethod
@abstractmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
pass
@staticmethod
@abstractmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
pass
@abstractmethod
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
pass
@staticmethod
@abstractmethod
def get_defaults(**kwargs: Any) -> Dict[str, Any]:
pass
@staticmethod
@abstractmethod
def get_default_value(key: str) -> Any:
pass
@abstractmethod
async def update(
self,
cache: Optional[Cache] = None,
**kwargs: Any,
) -> None:
pass
@abstractmethod
async def reset(
self,
*args: str,
cache: Optional[Cache] = None,
) -> None:
pass
@abstractmethod
async def purge(self, cache: Optional[Cache] = None) -> None:
pass

View File

@@ -0,0 +1,155 @@
from dataclasses import dataclass
from datetime import datetime
from logging import Logger
from typing import Any, Dict, Optional
from zoneinfo import ZoneInfo
from libbot.cache.classes import Cache
from libbot.pycord.classes import PycordBot
from pymongo.asynchronous.database import AsyncDatabase
from pymongo.errors import ConnectionFailure
from javelina.classes.fastapi import FastAPI
from javelina.classes.service_status import ServiceStatus
from javelina.enums import HealthStatus
from javelina.modules.database import db_client
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
@dataclass
class ApplicationHealth:
_last_update: datetime
api: ServiceStatus
bot: ServiceStatus
cache: ServiceStatus
database: ServiceStatus
@classmethod
async def from_data(cls, app: FastAPI, database: AsyncDatabase) -> "ApplicationHealth":
database_health: ServiceStatus = await ApplicationHealth.get_database_health(database)
cache_health: ServiceStatus = ApplicationHealth.get_cache_health(app.bot.cache)
data: Dict[str, Any] = {
"bot": ApplicationHealth.get_bot_health(app.bot, cache_health, database_health),
"cache": cache_health,
"database": database_health,
}
data["api"] = ApplicationHealth.get_api_health(
data["bot"], data["cache"], database_health
)
data["_last_update"] = datetime.now(tz=ZoneInfo("UTC"))
return cls(**data)
def update(self, app: FastAPI, database: AsyncDatabase) -> None:
raise NotImplementedError()
# TODO Fix the message
@staticmethod
def get_bot_health(
bot: PycordBot, cache_status: ServiceStatus, database_status: ServiceStatus
) -> ServiceStatus:
if not bot.is_ready():
return ServiceStatus(HealthStatus.FAILED, "discord connection has failed")
if database_status.status != HealthStatus.OPERATIONAL:
match database_status.status:
case HealthStatus.FAILED, HealthStatus.UNKNOWN:
return ServiceStatus(HealthStatus.FAILED, "database connection has failed")
case HealthStatus.DEGRADED:
return ServiceStatus(
HealthStatus.DEGRADED, "database connection is degraded"
)
if cache_status.status not in [HealthStatus.UNKNOWN, HealthStatus.OPERATIONAL]:
match cache_status.status:
case HealthStatus.FAILED:
return ServiceStatus(HealthStatus.DEGRADED, "cache connection has failed")
case HealthStatus.DEGRADED:
return ServiceStatus(HealthStatus.DEGRADED, "cache is degraded")
return ServiceStatus(HealthStatus.OPERATIONAL, None)
# TODO Fix the message
# TODO Implement this method
@staticmethod
def get_cache_health(cache: Cache) -> ServiceStatus:
return ServiceStatus(
HealthStatus.UNKNOWN,
None,
)
# TODO Fix the message
@staticmethod
async def get_database_health(database: AsyncDatabase) -> ServiceStatus:
try:
await db_client.admin.command("ping")
except ConnectionFailure as exc:
return ServiceStatus(HealthStatus.FAILED, str(exc))
return ServiceStatus(
HealthStatus.OPERATIONAL,
None,
)
@staticmethod
def get_api_health(
bot_status: ServiceStatus, cache_status: ServiceStatus, database_status: ServiceStatus
) -> ServiceStatus:
if database_status.status != HealthStatus.OPERATIONAL:
match database_status.status:
case HealthStatus.FAILED, HealthStatus.UNKNOWN:
return ServiceStatus(
HealthStatus.FAILED,
"database connection has failed",
)
case HealthStatus.DEGRADED:
return ServiceStatus(
HealthStatus.DEGRADED,
"database connection is degraded",
)
if bot_status.status != HealthStatus.OPERATIONAL:
match bot_status.status:
case HealthStatus.FAILED, HealthStatus.UNKNOWN:
return ServiceStatus(
HealthStatus.DEGRADED,
"bot integration has failed",
)
case HealthStatus.DEGRADED:
return ServiceStatus(
HealthStatus.DEGRADED,
"bot integration is degraded",
)
if cache_status.status not in [HealthStatus.OPERATIONAL, HealthStatus.UNKNOWN]:
match cache_status.status:
case HealthStatus.FAILED:
return ServiceStatus(HealthStatus.DEGRADED, "cache connection has failed")
case HealthStatus.DEGRADED:
return ServiceStatus(HealthStatus.DEGRADED, "cache is degraded")
return ServiceStatus(
HealthStatus.OPERATIONAL,
None,
)
def get_last_update(self) -> datetime:
return self._last_update
def to_json(self, detailed: Optional[bool] = False) -> Dict[str, Dict[str, str | None]]:
output: Dict[str, Any] = {
"api": self.api.to_json(detailed),
"bot": self.bot.to_json(detailed),
}
if detailed:
output["cache"] = self.cache.to_json(detailed)
output["database"] = self.database.to_json(detailed)
return output

View File

@@ -0,0 +1 @@
from .base_cacheable import BaseCacheable

View File

@@ -0,0 +1,110 @@
from abc import ABC
from logging import Logger
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from javelina.classes.abstract import Cacheable
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
class BaseCacheable(Cacheable, ABC):
"""Base implementation of Cacheable used by all cachable classes."""
_id: ObjectId
async def _set(self, cache: Optional[Cache] = None, **kwargs: Any) -> None:
for key, value in kwargs.items():
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await self.__collection__.update_one({"_id": self._id}, {"$set": kwargs})
self._update_cache(cache)
logger.info("Set attributes of %s to %s", self._id, kwargs)
async def _remove(self, *args: str, cache: Optional[Cache] = None) -> None:
attributes: Dict[str, Any] = {}
for key in args:
if not hasattr(self, key):
raise AttributeError()
default_value: Any = self.get_default_value(key)
setattr(self, key, default_value)
attributes[key] = default_value
await self.__collection__.update_one({"_id": self._id}, {"$set": attributes})
self._update_cache(cache)
logger.info("Reset attributes %s of %s to default values", args, self._id)
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
object_dict: Dict[str, Any] = self.to_dict(json_compatible=True)
if object_dict is not None:
cache.set_json(self._get_cache_key(), object_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
async def update(
self,
cache: Optional[Cache] = None,
**kwargs: Any,
) -> None:
"""Update attribute(s) on the object and save the updated entry into the database.
Args:
cache (:obj:`Cache`, optional): Cache engine that will be used to update the cache.
**kwargs (Any): Mapping of attributes in format `attribute_name=attribute_value` to update.
Raises:
AttributeError: Provided attribute does not exist in the class.
"""
await self._set(cache=cache, **kwargs)
async def reset(
self,
*args: str,
cache: Optional[Cache] = None,
) -> None:
"""Remove attribute(s) on the object, replace them with a default value and save the updated entry into the database.
Args:
*args (str): List of attributes to remove.
cache (:obj:`Cache`, optional): Cache engine that will be used to update the cache.
Raises:
AttributeError: Provided attribute does not exist in the class.
"""
await self._remove(*args, cache=cache)
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove object data from database. Currently only removes the record from a respective collection.
Args:
cache (:obj:`Cache`, optional): Cache engine that will be used to update the cache.
"""
await self.__collection__.delete_one({"_id": self._id})
self._delete_cache(cache)
logger.info("Purged %s from the database", self._id)

245
javelina/classes/consent.py Normal file
View File

@@ -0,0 +1,245 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from zoneinfo import ZoneInfo
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo import DESCENDING
from pymongo.results import InsertOneResult, UpdateResult
from javelina.classes.base import BaseCacheable
from javelina.enums import ConsentScope
from javelina.modules.database import col_consents
from javelina.modules.utils import restore_from_cache
@dataclass
class Consent(BaseCacheable):
"""Dataclass of DB entry of a consent entry"""
__slots__ = (
"_id",
"user_id",
"guild_id",
"scope",
"consent_date",
"expiration_date",
"withdrawal_date",
)
__short_name__ = "consent"
__collection__ = col_consents
_id: ObjectId
user_id: int
guild_id: int
scope: ConsentScope
consent_date: datetime
expiration_date: datetime | None
withdrawal_date: datetime | None
@staticmethod
def get_cache_key(user_id: int, guild_id: int, scope: ConsentScope) -> str:
return f"{Consent.__short_name__}_{user_id}_{guild_id}_{scope.value}"
# TODO Implement this method
@classmethod
async def from_id(cls, id: ObjectId, cache: Optional[Cache] = None, **kwargs: Any) -> Any:
raise NotImplementedError()
# TODO Add documentation
@classmethod
async def from_combination(
cls,
user_id: int,
guild_id: int,
scope: ConsentScope,
cache: Optional[Cache] = None,
) -> Any:
cached_entry: Dict[str, Any] | None = restore_from_cache(
cls.__short_name__, Consent.get_cache_key(user_id, guild_id, scope), cache=cache
)
if cached_entry is not None:
return cls(**cls._entry_from_cache(cached_entry))
db_entry: Dict[str, Any] | None = await cls.__collection__.find_one(
{"user_id": user_id, "guild_id": guild_id, "scope": scope.value},
sort={"expiration_date": DESCENDING},
)
if db_entry is None:
# TODO Implement a unique exception
# raise ConsentNotFoundError(user_id, guild_id, scope)
raise RuntimeError(
f"Could not find a consent of user {user_id} from {guild_id} for scope '{scope.value}'"
)
if cache is not None:
cache.set_json(
Consent.get_cache_key(user_id, guild_id, scope),
cls._entry_to_cache(db_entry),
)
return cls(**db_entry)
@classmethod
def from_entry(cls, db_entry: Dict[str, Any]) -> "Consent":
db_entry["scope"] = ConsentScope(db_entry["scope"])
return cls(**db_entry)
# TODO Add documentation
@classmethod
async def give(
cls,
user_id: int,
guild_id: int,
scope: ConsentScope,
expiration_date: Optional[datetime] = None,
cache: Optional[Cache] = None,
) -> Any:
await cls.withdraw_scope_consents(user_id, guild_id, scope)
db_entry = Consent.get_defaults(user_id, guild_id, scope)
db_entry["scope"] = scope.value
db_entry["expiration_date"] = expiration_date
insert_result: InsertOneResult = await cls.__collection__.insert_one(db_entry)
db_entry["scope"] = scope
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(
Consent.get_cache_key(user_id, guild_id, scope),
cls._entry_to_cache(db_entry),
)
return cls(**db_entry)
@staticmethod
async def withdraw_scope_consents(
user_id: int, guild_id: int, scope: ConsentScope
) -> UpdateResult:
"""Look up consents of a user in a guild with a specified scope and withdraw them.
Args:
user_id (int): Discord ID of a user.
guild_id (int): Discord ID of a guild.
scope (:obj:ConsentScope): Scope to look for.
Returns:
UpdateResult: Result object of all affected consents.
"""
return await Consent.__collection__.update_many(
{
"user_id": user_id,
"guild_id": guild_id,
"scope": scope.value,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
},
{"$set": {"withdrawal_date": datetime.now(tz=ZoneInfo("UTC"))}},
)
def _get_cache_key(self) -> str:
return self.get_cache_key(self.user_id, self.guild_id, self.scope)
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
cache_entry["scope"] = cache_entry["scope"].value
cache_entry["consent_date"] = cache_entry["consent_date"].isoformat()
cache_entry["expiration_date"] = (
None
if cache_entry["expiration_date"] is None
else cache_entry["expiration_date"].isoformat()
)
cache_entry["withdrawal_date"] = (
None
if cache_entry["withdrawal_date"] is None
else cache_entry["withdrawal_date"].isoformat()
)
return cache_entry
@staticmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
cache_entry["scope"] = ConsentScope(cache_entry["scope"])
cache_entry["consent_date"] = datetime.fromisoformat(cache_entry["consent_date"])
cache_entry["expiration_date"] = (
None
if cache_entry["expiration_date"] is None
else datetime.fromisoformat(cache_entry["expiration_date"])
)
cache_entry["withdrawal_date"] = (
None
if cache_entry["withdrawal_date"] is None
else datetime.fromisoformat(cache_entry["withdrawal_date"])
)
return db_entry
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert Consent object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of Consent
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"user_id": self.user_id,
"guild_id": self.guild_id,
"scope": self.scope if not json_compatible else self.scope.value,
"consent_date": self.consent_date.isoformat(),
"expiration_date": (
None if self.expiration_date is None else self.expiration_date.isoformat()
),
"withdrawal_date": (
None if self.withdrawal_date is None else self.withdrawal_date.isoformat()
),
}
@staticmethod
def get_defaults(
user_id: Optional[int] = None,
guild_id: Optional[int] = None,
scope: Optional[ConsentScope] = None,
) -> Dict[str, Any]:
return {
"user_id": user_id,
"guild_id": guild_id,
"scope": scope,
"consent_date": datetime.now(tz=ZoneInfo("UTC")),
"expiration_date": None,
"withdrawal_date": None,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in Consent.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in Consent")
return Consent.get_defaults()[key]
async def withdraw(self, cache: Optional[Cache] = None) -> None:
"""Withdraw consent now (in UTC timezone).
Args:
cache (:obj:`Cache`, optional): Cache engine that will be used to update the cache.
"""
await self.update(cache=cache, withdrawal_date=datetime.now(tz=ZoneInfo("UTC")))

View File

@@ -0,0 +1,108 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from javelina.classes.base import BaseCacheable
from javelina.modules.database import col_custom_channels
@dataclass
class CustomChannel(BaseCacheable):
"""Dataclass of DB entry of a custom channel"""
__slots__ = (
"_id",
"owner_id",
"guild_id",
"channel_id",
"allow_comments",
"allow_reactions",
"created",
"deleted",
)
__short_name__ = "channel"
__collection__ = col_custom_channels
_id: ObjectId
owner_id: int
guild_id: int
channel_id: int
allow_comments: bool
allow_reactions: bool
created: datetime
deleted: datetime | None
@classmethod
async def from_id(
cls,
user_id: int,
guild_id: int,
channel_id: Optional[int] = None,
cache: Optional[Cache] = None,
) -> "CustomChannel":
raise NotImplementedError()
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordGuild object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordGuild
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
"locale": self.locale,
}
async def _set(self, cache: Optional[Cache] = None, **kwargs: Any) -> None:
await super()._set(cache, **kwargs)
async def _remove(self, *args: str, cache: Optional[Cache] = None) -> None:
await super()._remove(*args, cache=cache)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
super()._update_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
super()._delete_cache(cache)
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
return cache_entry
@staticmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
return db_entry
# TODO Add documentation
@staticmethod
def get_defaults(guild_id: Optional[int] = None) -> Dict[str, Any]:
return {
"id": guild_id,
"locale": None,
}
# TODO Add documentation
@staticmethod
def get_default_value(key: str) -> Any:
if key not in CustomChannel.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in CustomChannel")
return CustomChannel.get_defaults()[key]

View File

@@ -2,6 +2,6 @@ class GuildNotFoundError(Exception):
"""PycordGuild could not find guild with such an ID in the database"""
def __init__(self, guild_id: int) -> None:
self.guild_id = guild_id
self.guild_id: int = guild_id
super().__init__(f"Guild with id {self.guild_id} was not found")

View File

@@ -1,7 +1,8 @@
class UserNotFoundError(Exception):
"""PycordUser could not find user with such an ID in the database"""
def __init__(self, user_id: int) -> None:
self.user_id = user_id
def __init__(self, user_id: int, guild_id: int) -> None:
self.user_id: int = user_id
self.guild_id: int = guild_id
super().__init__(f"User with id {self.user_id} was not found")
super().__init__(f"User with id {self.user_id} was not found in guild {self.guild_id}")

View File

@@ -0,0 +1,18 @@
from typing import Any, Optional
from fastapi import FastAPI as OriginalFastAPI
from libbot.pycord.classes import PycordBot
class FastAPI(OriginalFastAPI):
def __init__(self, *args, bot: Optional[PycordBot] = None, **kwargs) -> None:
self.bot: PycordBot | None = bot
self.status: Any | None = None
super().__init__(*args, **kwargs)
def set_bot(self, bot: PycordBot) -> None:
self.bot = bot
def update_status(self, status: Any) -> None:
self.status = status

View File

@@ -0,0 +1,39 @@
from dataclasses import dataclass
from typing import Any, Dict, List
from javelina.classes.guild_rules_section import GuildRulesSection
# Example JSON
# {
# "header": "These are our rules",
# "sections": [
# {
# "title": "1. First section",
# "description": "This sections contains some rules",
# "rules": [
# {
# "title": "Example rule",
# "content": "Do not wear sandals while in socks!",
# "punishment": 0,
# }
# ],
# }
# ],
# }
@dataclass
class GuildRules:
__slots__ = ("header", "sections")
header: str
sections: List[GuildRulesSection]
# TODO Implement this method
@classmethod
def from_json(cls, db_entry: Dict[str, Any]) -> "GuildRules":
raise NotImplementedError()
# TODO Implement this method
def to_dict(self) -> Dict[str, Any]:
raise NotImplementedError()

View File

@@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import Literal
from javelina.enums import Punishment
@dataclass
class GuildRulesRule:
__slots__ = ("title", "content", "punishment")
title: str
content: str
punishment: Literal[Punishment.WARNING, Punishment.MUTE, Punishment.KICK, Punishment.BAN]

View File

@@ -0,0 +1,13 @@
from dataclasses import dataclass
from typing import List
from javelina.classes.guild_rules_rule import GuildRulesRule
@dataclass
class GuildRulesSection:
__slots__ = ("title", "description", "rules")
title: str
description: str
rules: List[GuildRulesRule]

View File

@@ -0,0 +1,248 @@
import logging
from datetime import datetime, timedelta
from logging import Logger
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional
from zoneinfo import ZoneInfo
from aiohttp import ClientSession
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from discord import Activity, ActivityType, Colour, Embed, Guild, User
from libbot.cache.classes import CacheMemcached, CacheRedis
from libbot.cache.manager import create_cache_client
from libbot.i18n import BotLocale
from libbot.pycord.classes import PycordBot as LibPycordBot
from libbot.utils import json_read
from javelina.classes import PycordGuild, PycordUser, ScheduledAction
from javelina.enums import CacheTTL, EmbedColor
from javelina.modules.database import _update_database_indexes, col_scheduled_actions
from javelina.modules.utils.color_utils import hex_to_int
logger: Logger = logging.getLogger(__name__)
# from javelina.modules.tracking.dhl import update_tracks_dhl
class PycordBot(LibPycordBot):
__version__ = "0.0.1"
started: datetime
cache: CacheMemcached | CacheRedis | None = None
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._set_cache_engine()
self.client_session = ClientSession()
# This replacement exists because of the different
# i18n formats than provided by libbot
self._ = self._modified_string_getter
if self.scheduler is None:
return
def _set_cache_engine(self) -> None:
cache_type: Literal["redis", "memcached"] | None = self.config["cache"]["type"]
if "cache" in self.config and cache_type is not None:
self.cache = create_cache_client(
self.config,
cache_type,
prefix=self.config["cache"][cache_type]["prefix"],
default_ttl_seconds=CacheTTL.NORMAL.value,
)
def _modified_string_getter(self, key: str, *args: str, locale: str | None = None) -> Any:
"""This method exists because of the different i18n formats than provided by libbot.
It splits "-" and takes the first part of the provided locale to make complex language codes
compatible with an easy libbot approach to i18n.
"""
return self.bot_locale._(
key, *args, locale=None if locale is None else locale.split("-")[0]
)
# TODO Add documentation
async def get_scheduled_actions(self) -> List[ScheduledAction]:
return [
ScheduledAction.from_entry(entry) async for entry in col_scheduled_actions.find()
]
# TODO Add rollback mechanism for recovery from broken config
# TODO Add documentation
def reload(self) -> None:
config_old: Dict[str, Any] = self.config.copy()
try:
self.config = json_read(Path("config.json"))
self.bot_locale = BotLocale(
default_locale=self.config["locale"],
)
self.default_locale = self.bot_locale.default
self.locales = self.bot_locale.locales
except Exception as exc:
logger.error(
"Could not reload the configuration, restoring old in-memory values due to: %s",
exc,
exc_info=exc,
)
self.config = config_old
raise exc
async def set_status(self) -> None:
activity_enabled: bool = self.config["bot"]["status"]["enabled"]
activity_id: int = self.config["bot"]["status"]["activity_type"]
activity_message: str = self.config["bot"]["status"]["activity_text"]
if not activity_enabled:
logger.info("Activity is disabled")
return
try:
activity_type: ActivityType = ActivityType(activity_id)
except Exception as exc:
logger.debug(
"Could not activity with ID %s to ActivityType due to: %s",
activity_id,
exc,
exc_info=exc,
)
logger.error("Activity type with ID %s is not supported", activity_id)
return
await self.change_presence(activity=Activity(type=activity_type, name=activity_message))
logger.info(
"Set activity type to %s (%s) with message '%s'",
activity_id,
activity_type.name,
activity_message,
)
async def find_user(self, user: int | User, guild_id: int) -> PycordUser:
"""Find User by its ID or User object.
Args:
user (int | User): ID or User object to extract ID from
guild_id (int): ID of the guild user is member of
Returns:
PycordUser: User object
Raises:
UserNotFoundException: User was not found and creation was not allowed
"""
return (
await PycordUser.from_id(user, guild_id, cache=self.cache)
if isinstance(user, int)
else await PycordUser.from_id(user.id, guild_id, cache=self.cache)
)
async def find_guild(self, guild: int | Guild) -> PycordGuild:
"""Find Guild by its ID or Guild object.
Args:
guild (int | Guild): ID or User object to extract ID from
Returns:
PycordGuild: Guild object
Raises:
GuildNotFoundException: Guild was not found and creation was not allowed
"""
return (
await PycordGuild.from_id(guild, cache=self.cache)
if isinstance(guild, int)
else await PycordGuild.from_id(guild.id, cache=self.cache)
)
async def start(self, *args: Any, **kwargs: Any) -> None:
await self._schedule_tasks()
await _update_database_indexes()
self.started = datetime.now(tz=ZoneInfo("UTC"))
await super().start(*args, **kwargs)
async def close(self, **kwargs) -> None:
await self.client_session.close()
if self.scheduler is not None:
self.scheduler.shutdown()
await super().close(**kwargs)
async def _schedule_tasks(self) -> None:
if self.scheduler is None:
return
for scheduled_action in await self.get_scheduled_actions():
match scheduled_action.action_type:
# case ScheduledActionType.WEATHER_REPORT:
# func: Callable | None = None
case _:
continue
self.scheduler.add_job(
func,
trigger=CronTrigger.from_crontab(scheduled_action.action_schedule),
args=[self, scheduled_action.action_data],
)
if scheduled_action.invoke_on_start:
self.scheduler.add_job(
func,
trigger=DateTrigger(datetime.now() + timedelta(seconds=5)),
args=[self, scheduled_action.action_data],
)
# Scheduler job for DHL parcel tracking
# self.scheduler.add_job(
# update_tracks_dhl,
# trigger="cron",
# hour=self.config["modules"]["tracking"]["fetch_hours"],
# args=[self, self.client_session],
# )
pass
# TODO Add support for guild colors
def create_embed(
self,
title: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
color: Optional[EmbedColor] = EmbedColor.PRIMARY,
) -> Embed:
return Embed(
title=title,
description=description,
url=url,
color=Colour(hex_to_int(self.config["colors"][color.value])),
)
def create_embed_error(
self,
title: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
) -> Embed:
return self.create_embed(
title=title, description=description, url=url, color=EmbedColor.ERROR
)
def create_embed_success(
self,
title: Optional[str] = None,
description: Optional[str] = None,
url: Optional[str] = None,
) -> Embed:
return self.create_embed(
title=title, description=description, url=url, color=EmbedColor.SUCCESS
)

View File

@@ -0,0 +1,145 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from javelina.classes import GuildRules
from javelina.classes.base import BaseCacheable
from javelina.classes.errors import GuildNotFoundError
from javelina.modules.database import col_guilds
from javelina.modules.utils import restore_from_cache
@dataclass
class PycordGuild(BaseCacheable):
"""Dataclass of DB entry of a guild"""
__slots__ = ("_id", "id", "locale", "rules")
__short_name__ = "guild"
__collection__ = col_guilds
_id: ObjectId
id: int
locale: str
rules: GuildRules
@classmethod
async def from_id(
cls, guild_id: int, allow_creation: bool = True, cache: Optional[Cache] = None
) -> "PycordGuild":
"""Find the guild by its ID and construct PycordEventStage from database entry.
Args:
guild_id (int): ID of the guild to look up.
allow_creation (:obj:`bool`, optional): Create a new record if none found in the database.
cache (:obj:`Cache`, optional): Cache engine that will be used to fetch and update the cache.
Returns:
PycordGuild: Object of the found or newly created guild.
Raises:
GuildNotFoundError: Guild with such ID does not exist and creation was not allowed.
"""
cached_entry: Dict[str, Any] | None = restore_from_cache(
cls.__short_name__, guild_id, cache=cache
)
if cached_entry is not None:
return cls(**cls._entry_from_cache(cached_entry))
db_entry: Dict[str, Any] | None = await cls.__collection__.find_one({"id": guild_id})
if db_entry is None:
if not allow_creation:
raise GuildNotFoundError(guild_id)
db_entry = PycordGuild.get_defaults(guild_id)
insert_result: InsertOneResult = await cls.__collection__.insert_one(db_entry)
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(f"{cls.__short_name__}_{guild_id}", cls._entry_to_cache(db_entry))
db_entry["rules"] = (
None if db_entry["rules"] is None else GuildRules.from_json(db_entry["rules"])
)
return cls(**db_entry)
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordGuild object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordGuild
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
"locale": self.locale,
"rules": None if self.rules is None else self.rules.to_dict(),
}
async def _set(self, cache: Optional[Cache] = None, **kwargs: Any) -> None:
await super()._set(cache, **kwargs)
async def _remove(self, *args: str, cache: Optional[Cache] = None) -> None:
await super()._remove(*args, cache=cache)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
super()._update_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
super()._delete_cache(cache)
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
if cache_entry["rules"] is not None and isinstance(cache_entry["rules"], GuildRules):
cache_entry["rules"] = cache_entry["rules"].to_json()
return cache_entry
@staticmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
db_entry["rules"] = (
None if db_entry["rules"] is None else GuildRules.from_json(db_entry["rules"])
)
return db_entry
# TODO Add documentation
@staticmethod
def get_defaults(guild_id: Optional[int] = None) -> Dict[str, Any]:
return {"id": guild_id, "locale": None, "rules": None}
# TODO Add documentation
@staticmethod
def get_default_value(key: str) -> Any:
if key not in PycordGuild.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in PycordGuild")
return PycordGuild.get_defaults()[key]
# TODO Add documentation
async def set_rules(self, rules: GuildRules, cache: Optional[Cache] = None) -> None:
await self.update(cache=cache, rules=rules.to_dict())
# TODO Add documentation
async def clear_rules(self, cache: Optional[Cache] = None) -> None:
await self.update(cache=cache, rules=None)

View File

@@ -0,0 +1,277 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from logging import Logger
from typing import Any, Dict, List, Optional
from zoneinfo import ZoneInfo
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from javelina.classes import Consent
from javelina.classes.base import BaseCacheable
from javelina.classes.errors.pycord_user import UserNotFoundError
from javelina.classes.wallet import Wallet
from javelina.enums import ConsentScope
from javelina.modules.database import col_users
from javelina.modules.utils import restore_from_cache
logger: Logger = logging.getLogger(__name__)
@dataclass
class PycordUser(BaseCacheable):
"""Dataclass of DB entry of a user"""
__slots__ = ("_id", "id", "guild_id")
__short_name__ = "user"
__collection__ = col_users
_id: ObjectId
id: int
guild_id: int
@classmethod
async def from_id(
cls,
user_id: int,
guild_id: int,
allow_creation: bool = True,
cache: Optional[Cache] = None,
) -> "PycordUser":
"""Find user in database and create new record if user does not exist.
Args:
user_id (int): User's Discord ID
guild_id (int): User's guild Discord ID
allow_creation (:obj:`bool`, optional): Create new user record if none found in the database
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
PycordUser: User object
Raises:
UserNotFoundError: User was not found and creation was not allowed
"""
cached_entry: Dict[str, Any] | None = restore_from_cache(
cls.__short_name__, f"{user_id}_{guild_id}", cache=cache
)
if cached_entry is not None:
return cls(**cls._entry_from_cache(cached_entry))
db_entry: Dict[str, Any] | None = await cls.__collection__.find_one(
{"id": user_id, "guild_id": guild_id}
)
if db_entry is None:
if not allow_creation:
raise UserNotFoundError(user_id, guild_id)
db_entry = PycordUser.get_defaults(user_id, guild_id)
insert_result: InsertOneResult = await cls.__collection__.insert_one(db_entry)
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(
f"{cls.__short_name__}_{user_id}_{guild_id}", cls._entry_to_cache(db_entry)
)
return cls(**db_entry)
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordUser object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordUser
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
"guild_id": self.guild_id,
}
async def _set(self, cache: Optional[Cache] = None, **kwargs: Any) -> None:
await super()._set(cache, **kwargs)
async def _remove(self, *args: str, cache: Optional[Cache] = None) -> None:
await super()._remove(*args, cache=cache)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self.id}_{self.guild_id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
super()._update_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
super()._delete_cache(cache)
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
return cache_entry
@staticmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
return db_entry
# TODO Add documentation
@staticmethod
def get_defaults(
user_id: Optional[int] = None, guild_id: Optional[int] = None
) -> Dict[str, Any]:
return {
"id": user_id,
"guild_id": guild_id,
}
# TODO Add documentation
@staticmethod
def get_default_value(key: str) -> Any:
if key not in PycordUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in PycordUser")
return PycordUser.get_defaults()[key]
async def update(
self,
cache: Optional[Cache] = None,
**kwargs: Any,
) -> None:
await super().update(cache=cache, **kwargs)
async def reset(
self,
*args: str,
cache: Optional[Cache] = None,
) -> None:
await super().reset(*args, cache=cache)
async def purge(self, cache: Optional[Cache] = None) -> None:
await super().purge(cache)
async def get_wallet(self, guild_id: int, cache: Optional[Cache] = None) -> Wallet:
"""Get wallet of the user.
Args:
guild_id (int): Guild ID of the wallet
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
Wallet: Wallet object of the user
"""
return await Wallet.from_id(self.id, guild_id, cache=cache)
# TODO Add documentation
async def has_active_consent(self, scope: ConsentScope) -> bool:
# TODO Test this query
consent: Dict[str, Any] | None = await Consent.__collection__.find_one(
{
"user_id": self.id,
"guild_id": self.guild_id,
"scope": scope.value,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
}
)
return consent is not None
# TODO Add documentation
async def give_consent(
self,
scope: ConsentScope,
expiration_date: Optional[datetime] = None,
cache: Optional[Cache] = None,
) -> None:
await Consent.give(self.id, self.guild_id, scope, expiration_date, cache=cache)
# TODO Add documentation
async def withdraw_consent(
self,
scope: ConsentScope,
cache: Optional[Cache] = None,
) -> None:
# TODO Test this query
await Consent.__collection__.update_many(
{
"user_id": self.id,
"guild_id": self.guild_id,
"scope": scope.value,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
},
{"$set": {"withdrawal_date": datetime.now(tz=ZoneInfo("UTC"))}},
)
if cache is not None:
cache.delete(Consent.get_cache_key(self.id, self.guild_id, scope))
# TODO Add documentation
async def withdraw_all_consents(
self,
cache: Optional[Cache] = None,
) -> None:
filter: Dict[str, Any] = {
"user_id": self.id,
"guild_id": self.guild_id,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
}
consents: List[Dict[str, Any]] | None = await Consent.__collection__.find(
filter
).to_list()
await Consent.__collection__.update_many(
filter,
{"$set": {"withdrawal_date": datetime.now(tz=ZoneInfo("UTC"))}},
)
if cache is not None and consents is not None:
for consent in consents:
cache.delete(
Consent.get_cache_key(
self.id, self.guild_id, ConsentScope(consent["scope"])
)
)
async def get_consents(self) -> List[Consent]:
consents: List[Consent] = []
async for consent in Consent.__collection__.find(
{
"user_id": self.id,
"guild_id": self.guild_id,
"withdrawal_date": None,
"$or": [
{"expiration_date": {"$gte": datetime.now(tz=ZoneInfo("UTC"))}},
{"expiration_date": None},
],
}
):
consents.append(Consent.from_entry(consent))
return consents

View File

@@ -0,0 +1,26 @@
from dataclasses import dataclass
from typing import Any, Dict
from bson import ObjectId
from javelina.enums import ScheduledActionType
@dataclass
class ScheduledAction:
"""Dataclass of DB entry of a scheduled action"""
__slots__ = ("_id", "action_type", "action_schedule", "action_data", "invoke_on_start")
_id: ObjectId
action_type: ScheduledActionType
action_schedule: str
action_data: Dict[str, Any] | None
invoke_on_start: bool
# TODO Add documentation
@classmethod
def from_entry(cls, db_entry: Dict[str, Any]) -> "ScheduledAction":
db_entry["action_type"] = ScheduledActionType(db_entry["action_type"])
return cls(**db_entry)

View File

@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import Dict, Literal, Optional
from javelina.enums import HealthStatus
@dataclass
class ServiceStatus:
status: Literal[
HealthStatus.OPERATIONAL,
HealthStatus.DEGRADED,
HealthStatus.FAILED,
HealthStatus.UNKNOWN,
]
message: str | None
def to_json(self, detailed: Optional[bool] = False) -> Dict[str, str | None]:
output: Dict[str, str | None] = {"status": self.status.value}
if detailed:
output["message"] = self.message
return output

View File

@@ -5,21 +5,30 @@ from logging import Logger
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from classes.errors.wallet import (
from javelina.classes.base import BaseCacheable
from javelina.classes.errors.wallet import (
WalletBalanceLimitExceeded,
WalletInsufficientFunds,
WalletNotFoundError,
WalletOverdraftLimitExceeded,
WalletInsufficientFunds,
)
from modules.database import col_wallets
from javelina.modules.database import col_wallets
from javelina.modules.utils import restore_from_cache
logger: Logger = logging.getLogger(__name__)
@dataclass
class Wallet:
class Wallet(BaseCacheable):
"""Dataclass of DB entry of a wallet"""
__slots__ = ("_id", "owner_id", "guild_id", "balance", "is_frozen", "created")
__short_name__ = "wallet"
__collection__ = col_wallets
_id: ObjectId
owner_id: int
guild_id: int
@@ -30,8 +39,19 @@ class Wallet:
# TODO Write a docstring
@classmethod
async def from_id(
cls, owner_id: int, guild_id: int, allow_creation: bool = True
cls,
owner_id: int,
guild_id: int,
allow_creation: bool = True,
cache: Optional[Cache] = None,
) -> "Wallet":
cached_entry: Dict[str, Any] | None = restore_from_cache(
cls.__short_name__, f"{owner_id}_{guild_id}", cache=cache
)
if cached_entry is not None:
return cls(**cls._entry_from_cache(cached_entry))
db_entry = await col_wallets.find_one({"owner_id": owner_id, "guild_id": guild_id})
if db_entry is None:
@@ -44,8 +64,34 @@ class Wallet:
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(
f"{cls.__short_name__}_{owner_id}_{guild_id}", cls._entry_to_cache(db_entry)
)
return cls(**db_entry)
def _get_cache_key(self) -> str:
return f"{self.__short_name__}_{self.owner_id}_{self.guild_id}"
@staticmethod
def _entry_to_cache(db_entry: Dict[str, Any]) -> Dict[str, Any]:
cache_entry: Dict[str, Any] = db_entry.copy()
cache_entry["_id"] = str(cache_entry["_id"])
cache_entry["created"] = cache_entry["created"].isoformat()
return cache_entry
@staticmethod
def _entry_from_cache(cache_entry: Dict[str, Any]) -> Dict[str, Any]:
db_entry: Dict[str, Any] = cache_entry.copy()
db_entry["_id"] = ObjectId(db_entry["_id"])
db_entry["created"] = datetime.fromisoformat(db_entry["created"])
return db_entry
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert Wallet object to a JSON representation.
@@ -61,19 +107,9 @@ class Wallet:
"guild_id": self.guild_id,
"balance": self.balance,
"is_frozen": self.is_frozen,
"created": self.created,
"created": self.created if not json_compatible else self.created.isoformat(),
}
async def _set(self, key: str, value: Any) -> None:
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await col_wallets.update_one({"_id": self._id}, {"$set": {key: value}}, upsert=True)
logger.info("Set attribute '%s' of the wallet %s to '%s'", key, str(self._id), value)
@staticmethod
def get_defaults(
owner_id: Optional[int] = None, guild_id: Optional[int] = None
@@ -94,21 +130,32 @@ class Wallet:
return Wallet.get_defaults()[key]
# TODO Write a docstring
async def freeze(self) -> None:
await self._set("is_frozen", True)
async def freeze(
self,
cache: Optional[Cache] = None,
) -> None:
await self.update(cache, is_frozen=True)
# TODO Write a docstring
async def unfreeze(self) -> None:
await self._set("is_frozen", False)
async def unfreeze(
self,
cache: Optional[Cache] = None,
) -> None:
await self.update(cache, is_frozen=False)
# TODO Write a docstring
async def deposit(self, amount: float, balance_limit: Optional[float] = None) -> float:
async def deposit(
self,
amount: float,
balance_limit: Optional[float] = None,
cache: Optional[Cache] = None,
) -> float:
new_balance: float = round(self.balance + amount, 2)
if balance_limit is not None and new_balance > balance_limit:
raise WalletBalanceLimitExceeded(self, amount, balance_limit)
await self._set("balance", new_balance)
await self.update(cache, balance=new_balance)
return new_balance
@@ -118,6 +165,7 @@ class Wallet:
amount: float,
allow_overdraft: bool = False,
overdraft_limit: Optional[float] = None,
cache: Optional[Cache] = None,
) -> float:
if amount > self.balance:
if not allow_overdraft or overdraft_limit is None:
@@ -128,7 +176,7 @@ class Wallet:
new_balance: float = round(self.balance - amount, 2)
await self._set("balance", new_balance)
await self.update(cache, balance=new_balance)
return new_balance
@@ -140,13 +188,16 @@ class Wallet:
balance_limit: Optional[float] = None,
allow_overdraft: bool = False,
overdraft_limit: Optional[float] = None,
cache: Optional[Cache] = None,
) -> None:
# TODO Replace with a concrete exception
if amount < 0:
raise ValueError()
# allow_creation might need to be set to False in the future
# if users will be able to opt out from having a wallet
wallet: Wallet = await self.from_id(
wallet_owner_id, wallet_guild_id, allow_creation=False
wallet_owner_id, wallet_guild_id, allow_creation=True, cache=cache
)
if balance_limit is not None and amount + wallet.balance > balance_limit:
@@ -160,5 +211,5 @@ class Wallet:
raise WalletOverdraftLimitExceeded(self, amount, overdraft_limit)
# TODO Make a sanity check to revert the transaction if anything goes wrong
await self.withdraw(amount, allow_overdraft, overdraft_limit)
await wallet.deposit(amount, balance_limit)
await self.withdraw(amount, allow_overdraft, overdraft_limit, cache=cache)
await wallet.deposit(amount, balance_limit, cache=cache)

112
javelina/cogs/cog_admin.py Normal file
View File

@@ -0,0 +1,112 @@
from typing import Callable, List
from discord import (
ApplicationContext,
Cog,
Forbidden,
InteractionContextType,
Message,
User,
default_permissions,
option,
slash_command,
)
from libbot.i18n import _, in_every_locale
from javelina.classes.pycord_bot import PycordBot
class CogAdmin(Cog):
"""Cog with the guessing command."""
def __init__(self, bot: PycordBot):
self.bot: PycordBot = bot
@slash_command(
name="reload",
description=_("description", "commands", "reload"),
description_localizations=in_every_locale("description", "commands", "reload"),
contexts={InteractionContextType.bot_dm},
)
async def command_reload(self, ctx: ApplicationContext) -> None:
if ctx.user.id not in self.bot.owner_ids:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._(
"permission_denied_title", "messages", "general", locale=ctx.locale
),
self.bot._("permission_denied", "messages", "general", locale=ctx.locale),
),
ephemeral=True,
)
return
try:
self.bot.reload()
await self.bot.set_status()
await ctx.respond(
embed=self.bot.create_embed_success(
self.bot._("success_title", "messages", "general", locale=ctx.locale),
self.bot._("config_reload_success", "messages", "admin", locale=ctx.locale),
)
)
except Exception as exc:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "general", locale=ctx.locale),
self.bot._(
"config_reload_error", "messages", "admin", locale=ctx.locale
).format(error=exc),
),
ephemeral=True,
)
# TODO Implement i18n
# TODO Account for rate limits
@slash_command(
name="clear",
description=_("description", "commands", "clear"),
description_localizations=in_every_locale("description", "commands", "clear"),
contexts={InteractionContextType.guild},
)
@option("amount", description="How many messages to delete", min_value=1, max_value=100)
@option("user", description="User whose balance to check (if not your own)", required=False)
@default_permissions(manage_messages=True)
async def command_clear(
self, ctx: ApplicationContext, amount: int, user: User = None
) -> None:
try:
check: Callable[[Message], bool] = (
(lambda msg: True) if user is None else (lambda msg: msg.author.id == user.id)
)
purged_messages: List[Message] = await ctx.channel.purge(limit=amount, check=check)
await ctx.respond(
embed=self.bot.create_embed(
description=f"Deleted {len(purged_messages)} message(s)."
),
ephemeral=True,
delete_after=3.0,
)
except Forbidden:
await ctx.respond(
embed=self.bot.create_embed_error(
"Missing permissions",
description="The bot does not have permission to delete messages. Please, allow the bot to delete messages and try again.",
),
ephemeral=True,
)
except Exception as exc:
await ctx.respond(
embed=self.bot.create_embed_error(
"Something went wrong",
description=f"Could not delete messages:\n```\n{exc}\n```",
),
ephemeral=True,
)
def setup(bot: PycordBot) -> None:
bot.add_cog(CogAdmin(bot))

View File

@@ -0,0 +1,99 @@
from datetime import datetime
from logging import Logger
from zoneinfo import ZoneInfo
from discord import Cog, Message
from javelina.classes.pycord_bot import PycordBot
from javelina.enums import AnalyticsEventType
from javelina.modules.database import col_analytics
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
class CogAnalytics(Cog):
def __init__(self, bot: PycordBot):
self.bot: PycordBot = bot
@Cog.listener()
async def on_message(self, message: Message) -> None:
if (
message.guild is None
or message.channel is None
or message.author is None
or message.author.bot
):
return
await col_analytics.insert_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_SENT.value,
"event_date": message.created_at,
"guild_id": message.guild.id,
"channel_id": message.channel.id,
"message_id": message.id,
"user_id": message.author.id,
"is_deleted": False,
}
)
@Cog.listener()
async def on_message_edit(self, before: Message, after: Message) -> None:
if (
after.guild is None
or after.channel is None
or after.author is None
or after.author.bot
):
return
await col_analytics.insert_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_EDITED.value,
"event_date": after.edited_at,
"guild_id": after.guild.id,
"channel_id": after.channel.id,
"message_id": after.id,
"user_id": after.author.id,
}
)
@Cog.listener()
async def on_message_delete(self, message: Message) -> None:
if (
message.guild is None
or message.channel is None
or message.author is None
or message.author.bot
):
return
await col_analytics.insert_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_DELETED.value,
"event_date": datetime.now(tz=ZoneInfo("UTC")),
"guild_id": message.guild.id,
"channel_id": message.channel.id,
"message_id": message.id,
"user_id": message.author.id,
}
)
await col_analytics.update_one(
{
"event_type": AnalyticsEventType.GUILD_MESSAGE_SENT.value,
"guild_id": message.guild.id,
"channel_id": message.channel.id,
"message_id": message.id,
},
{
"$set": {
"is_deleted": True,
}
},
)
def setup(bot: PycordBot) -> None:
bot.add_cog(CogAnalytics(bot))

View File

@@ -0,0 +1,331 @@
import logging
from datetime import datetime
from logging import Logger
from typing import Any, Dict, List
from zoneinfo import ZoneInfo
from discord import (
ApplicationContext,
Interaction,
InteractionContextType,
OptionChoice,
SlashCommandGroup,
option,
)
from discord.ext import commands
from libbot.i18n import _, in_every_locale
from tempora import parse_timedelta
from javelina.classes import Consent, PycordUser
from javelina.classes.pycord_bot import PycordBot
from javelina.enums import ConsentDuration, ConsentScope
logger: Logger = logging.getLogger(__name__)
class CogConsent(commands.Cog):
def __init__(self, client: PycordBot):
self.bot: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup(
"consent", "Consent management", contexts={InteractionContextType.guild}
)
@staticmethod
def _get_scope_choices() -> List[OptionChoice]:
choices: List[OptionChoice] = []
for scope in ConsentScope._member_map_.values():
scope_value: str = scope.value
choices.append(
OptionChoice(
_("name", "data_control", "scopes", scope_value),
scope_value,
in_every_locale("name", "data_control", "scopes", scope_value),
)
)
return choices
@staticmethod
def _get_consent_durations() -> List[OptionChoice]:
choices: List[OptionChoice] = []
for duration in ConsentDuration._member_map_.values():
duration_value: str = duration.value
choices.append(
OptionChoice(
_(duration_value, "data_control", "consent_durations"),
duration_value,
in_every_locale(duration_value, "data_control", "consent_durations"),
)
)
return choices
# /consent terms <scope>
# Will provide information about terms
# TODO Implement i18n
# TODO Implement consent duration
@command_group.command(
name="terms",
description="View terms for the consent scope",
)
@option(
"scope",
description="Scope of the consent",
choices=_get_scope_choices(),
)
async def command_consent_terms(self, ctx: ApplicationContext, scope: str) -> None:
scopes_config: Dict[str, Dict[str, Any]] = self.bot.config["modules"]["consent"][
"scopes"
]
if scope not in scopes_config:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "admin", locale=ctx.locale),
"Scope does not exist in the config!",
),
ephemeral=True,
)
return
scope_config: Dict[str, Any] = scopes_config[scope]
interaction: Interaction = await ctx.respond(
embed=self.bot.create_embed(
"Terms for {third_party_notice}**{scope_name}**".format(
third_party_notice=(
"" if not scope_config["is_third_party"] else "a third-party scope "
),
scope_name=self.bot._(
"name", "data_control", "scopes", scope, locale=ctx.locale
),
),
"Terms of use: {terms_url}\nPrivacy policy: {privacy_url}\n\nNote: Any consent given on this Discord server will be valid only for this server.".format(
terms_url=scope_config["terms_url"],
privacy_url=scope_config["privacy_url"],
),
),
ephemeral=True,
)
# /consent give <scope> [<duration>]
# Will provide information about terms and a button to confirm
# TODO Implement i18n
@command_group.command(
name="give",
description="Give consent to the scope",
)
@option(
"scope",
description="Scope of the consent",
choices=_get_scope_choices(),
)
@option(
"duration",
description="Duration of the consent",
choices=_get_consent_durations(),
)
async def command_consent_give(
self, ctx: ApplicationContext, scope: str, duration: str = ConsentDuration.NORMAL.value
) -> None:
scopes_config: Dict[str, Dict[str, Any]] = self.bot.config["modules"]["consent"][
"scopes"
]
if scope not in scopes_config:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "admin", locale=ctx.locale),
"Scope does not exist in the config!",
),
ephemeral=True,
)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
is_scope_third_party: bool = self.bot.config["modules"]["consent"]["scopes"][scope][
"is_third_party"
]
expiration_date: datetime = datetime.now(tz=ZoneInfo("UTC")) + parse_timedelta(
self.bot.config["modules"]["consent"]["durations"][
"third_party" if is_scope_third_party else "first_party"
][duration]
)
try:
await user.give_consent(ConsentScope(scope), expiration_date, cache=self.bot.cache)
await ctx.respond(
embed=self.bot.create_embed_success("Success", "Consent has been given."),
ephemeral=True,
)
except Exception as exc:
logger.error("Could not give consent due to: %s", exc, exc_info=exc)
await ctx.respond(
embed=self.bot.create_embed_error("Error", "Something went wrong!"),
ephemeral=True,
)
# /consent withdraw <scope>
# Will directly withdraw consent if confirmation is provided
# TODO Implement i18n
# TODO Implement the command
@command_group.command(
name="withdraw",
description="Withdraw consent to the scope",
)
@option(
"scope",
description="Scope of the consent",
choices=_get_scope_choices(),
)
async def command_consent_withdraw(self, ctx: ApplicationContext, scope: str) -> None:
scopes_config: Dict[str, Dict[str, Any]] = self.bot.config["modules"]["consent"][
"scopes"
]
if scope not in scopes_config:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "admin", locale=ctx.locale),
"Scope does not exist in the config!",
),
ephemeral=True,
)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
await user.withdraw_consent(ConsentScope(scope), cache=self.bot.cache)
await ctx.respond(
embed=self.bot.create_embed("Done", "The consent has been withdrawn."),
ephemeral=True,
)
# /consent give_all [<duration>] [<confirm>]
# Will inform about necessity to review all scopes and a button to confirm
# TODO Implement i18n
@command_group.command(
name="give_all",
description="Give consent to all scopes",
)
@option(
"duration",
description="Duration of the consent",
choices=_get_consent_durations(),
)
@option(
"confirm",
description="Confirmation of the action",
required=False,
)
async def command_consent_give_all(
self,
ctx: ApplicationContext,
confirm: bool = False,
duration: str = ConsentDuration.NORMAL.value,
) -> None:
if not confirm:
await ctx.respond("Operation not confirmed!", ephemeral=True)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
try:
for scope in ConsentScope._member_map_.values():
is_scope_third_party: bool = self.bot.config["modules"]["consent"]["scopes"][
scope.value
]["is_third_party"]
expiration_date: datetime = datetime.now(tz=ZoneInfo("UTC")) + parse_timedelta(
self.bot.config["modules"]["consent"]["durations"][
"third_party" if is_scope_third_party else "first_party"
][duration]
)
await user.give_consent(
ConsentScope(scope), expiration_date, cache=self.bot.cache
)
except Exception as exc:
logger.error("Could not give consent due to: %s", exc, exc_info=exc)
await ctx.respond(
embed=self.bot.create_embed_error("Error", "Something went wrong!"),
ephemeral=True,
)
return
await ctx.respond(
embed=self.bot.create_embed_success("Success", "Consent has been given."),
ephemeral=True,
)
# /consent withdraw_all [<confirm>]
# Will directly withdraw all consents if confirmation is provided
# TODO Implement i18n
@command_group.command(
name="withdraw_all",
description="Withdraw consent to all scopes",
)
@option(
"confirm",
description="Confirmation of the action",
required=False,
)
async def command_consent_withdraw_all(
self, ctx: ApplicationContext, confirm: bool = False
) -> None:
if not confirm:
await ctx.respond("Operation not confirmed!", ephemeral=True)
return
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
await user.withdraw_all_consents(cache=self.bot.cache)
await ctx.respond(
embed=self.bot.create_embed("Done", "All consents have been withdrawn."),
ephemeral=True,
)
# /consent review
# Will show all consents provided by the user, including scopes and expiration dates
# TODO Implement i18n
@command_group.command(
name="review",
description="Review all given consents",
)
async def command_consent_review(self, ctx: ApplicationContext) -> None:
user: PycordUser = await self.bot.find_user(ctx.user, ctx.guild_id)
consents: List[Consent] = await user.get_consents()
if len(consents) == 0:
await ctx.respond(
embed=self.bot.create_embed("Consents", "You have no active consents."),
ephemeral=True,
)
return
joined_consents: str = "\n".join(
[
f"`{consent.scope.value}` - Expires <t:{int(consent.expiration_date.timestamp())}>"
for consent in consents
]
)
await ctx.respond(
embed=self.bot.create_embed("Consents", f"Active consents:\n{joined_consents}"),
ephemeral=True,
)
def setup(client: PycordBot) -> None:
client.add_cog(CogConsent(client))

58
javelina/cogs/cog_data.py Normal file
View File

@@ -0,0 +1,58 @@
import logging
from logging import Logger
from discord import (
ApplicationContext,
InteractionContextType,
SlashCommandGroup,
option,
)
from discord.ext import commands
from javelina.classes.pycord_bot import PycordBot
logger: Logger = logging.getLogger(__name__)
class CogData(commands.Cog):
def __init__(self, client: PycordBot):
self.bot: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup(
"data", "Data management", contexts={InteractionContextType.guild}
)
# /data checkout
# Export all user data in a ZIP archive
# TODO Implement i18n
# TODO Implement the command
@command_group.command(
name="checkout",
description="Checkout all user data",
)
async def command_data_checkout(self, ctx: ApplicationContext) -> None:
await ctx.respond("Command is not implemented!", ephemeral=True)
# /data purge [<confirm>]
# Soft-delete all user data
# TODO Implement i18n
# TODO Implement the command
@command_group.command(
name="purge",
description="Delete all user data",
)
@option(
"confirm",
description="Confirmation of the action",
required=False,
)
async def command_data_purge(self, ctx: ApplicationContext, confirm: bool = False) -> None:
if not confirm:
await ctx.respond("Operation not confirmed!", ephemeral=True)
return
await ctx.respond("Command is not implemented!", ephemeral=True)
def setup(client: PycordBot) -> None:
client.add_cog(CogData(client))

View File

@@ -0,0 +1,14 @@
from discord import Cog
from javelina.classes.pycord_bot import PycordBot
class CogOrganizational(Cog):
"""Cog with the guessing command."""
def __init__(self, bot: PycordBot):
self.bot: PycordBot = bot
def setup(bot: PycordBot) -> None:
bot.add_cog(CogOrganizational(bot))

View File

@@ -0,0 +1,47 @@
from logging import Logger
from discord import ApplicationContext, Cog, DiscordException
from discord.ext.commands import CheckFailure
from javelina.classes.pycord_bot import PycordBot
from javelina.enums import EmbedColor
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
class CogUtility(Cog):
def __init__(self, bot: PycordBot):
self.bot: PycordBot = bot
@Cog.listener()
async def on_ready(self) -> None:
"""Listener for the event when bot connects to Discord and becomes "ready"."""
logger.info("Logged in as %s", self.bot.user)
await self.bot.set_status()
# TODO Should probably also add an error message displayed to users
@Cog.listener()
async def on_application_command_error(
self, ctx: ApplicationContext, error: DiscordException
) -> None:
if isinstance(error, CheckFailure):
await ctx.respond(
embed=self.bot.create_embed(
"Something went wrong", str(error), color=EmbedColor.ERROR
),
ephemeral=True,
)
return
logger.error(
"An error has occurred during execution of a command %s: %s",
ctx.command,
error,
exc_info=error,
)
def setup(bot: PycordBot) -> None:
bot.add_cog(CogUtility(bot))

View File

@@ -0,0 +1,95 @@
import logging
from logging import Logger
from discord import (
ApplicationContext,
InteractionContextType,
SlashCommandGroup,
User,
option,
)
from discord.ext import commands
from javelina.classes.errors import WalletInsufficientFunds
from javelina.classes.pycord_bot import PycordBot
from javelina.classes.wallet import Wallet
from javelina.enums import ConsentScope
from javelina.modules.middleware import user_consent_required
logger: Logger = logging.getLogger(__name__)
class CogWallet(commands.Cog):
def __init__(self, client: PycordBot):
self.bot: PycordBot = client
command_group: SlashCommandGroup = SlashCommandGroup(
"wallet", "Wallet management", contexts={InteractionContextType.guild}
)
@command_group.command(
name="balance",
description="View wallet's balance",
)
@option("user", description="User whose balance to check (if not your own)", required=False)
@user_consent_required(ConsentScope.GENERAL)
async def command_wallet_balance(self, ctx: ApplicationContext, user: User = None) -> None:
wallet: Wallet = await Wallet.from_id(
ctx.user.id if not user else user.id, ctx.guild_id, cache=self.bot.cache
)
await ctx.respond(
embed=self.bot.create_embed(
self.bot._("balance_title", "messages", "wallet", locale=ctx.locale),
(
self.bot._("balance_own", "messages", "wallet", locale=ctx.locale).format(
balance=wallet.balance
)
if user is None
else self.bot._(
"balance_user", "messages", "wallet", locale=ctx.locale
).format(balance=wallet.balance, user=user.display_name)
),
)
)
@command_group.command(
name="transfer",
description="View wallet's balance",
)
@option("user", description="Recipient")
@option("amount", description="Amount", min_value=0.01)
async def command_wallet_transfer(
self, ctx: ApplicationContext, user: User, amount: float
) -> None:
amount = round(amount, 2)
# Guild will be needed for overdraft options
# guild: PycordGuild = await PycordGuild.from_id(ctx.guild_id)
wallet: Wallet = await Wallet.from_id(ctx.user.id, ctx.guild_id, cache=self.bot.cache)
try:
await wallet.transfer(user.id, ctx.guild_id, amount)
except WalletInsufficientFunds:
await ctx.respond(
embed=self.bot.create_embed_error(
self.bot._("error_title", "messages", "general", locale=ctx.locale),
self.bot._(
"transfer_insufficient_funds", "messages", "wallet", locale=ctx.locale
).format(amount=round(abs(wallet.balance - amount), 2)),
)
)
return
await ctx.respond(
embed=self.bot.create_embed_success(
self.bot._("success_title", "messages", "general", locale=ctx.locale),
self.bot._("transfer_success", "messages", "wallet", locale=ctx.locale).format(
amount=amount, recipient=user.display_name
),
)
)
def setup(client: PycordBot) -> None:
client.add_cog(CogWallet(client))

View File

@@ -0,0 +1,9 @@
from .analytics_event_type import AnalyticsEventType
from .cache_ttl import CacheTTL
from .consent_duration import ConsentDuration
from .consent_scope import ConsentScope
from .embed_color import EmbedColor
from .health_status import HealthStatus
from .message_events import MessageEvents
from .punishment import Punishment
from .scheduled_action_type import ScheduledActionType

View File

@@ -0,0 +1,7 @@
from enum import Enum
class AnalyticsEventType(Enum):
GUILD_MESSAGE_SENT = "guild_message_sent"
GUILD_MESSAGE_EDITED = "guild_message_edited"
GUILD_MESSAGE_DELETED = "guild_message_deleted"

View File

@@ -0,0 +1,7 @@
from enum import Enum
class CacheTTL(Enum):
SHORT = 300
NORMAL = 3600
LONG = 86400

View File

@@ -0,0 +1,7 @@
from enum import Enum
class ConsentDuration(Enum):
SHORT = "short"
NORMAL = "normal"
LONG = "long"

View File

@@ -0,0 +1,6 @@
from enum import Enum
class ConsentScope(Enum):
GENERAL = "general"
INTEGRATION_DEEPL = "integration_deepl"

View File

@@ -0,0 +1,9 @@
from enum import Enum
class EmbedColor(Enum):
PRIMARY = "primary"
SECONDARY = "secondary"
SUCCESS = "success"
WARNING = "warning"
ERROR = "error"

View File

@@ -0,0 +1,8 @@
from enum import Enum
class HealthStatus(Enum):
OPERATIONAL = "operational"
DEGRADED = "degraded"
FAILED = "failed"
UNKNOWN = "unknown"

View File

@@ -0,0 +1,8 @@
from enum import Enum
class Punishment(Enum):
WARNING = 0
MUTE = 1
KICK = 2
BAN = 3

View File

@@ -0,0 +1,5 @@
from enum import Enum
class ScheduledActionType(Enum):
WEATHER_REPORT = 0

View File

View File

@@ -0,0 +1 @@
from . import database, migrator, scheduler, utils

View File

@@ -2,8 +2,10 @@
from typing import Any, Mapping
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase
from libbot.utils import config_get
from pymongo import AsyncMongoClient
from pymongo.asynchronous.collection import AsyncCollection
from pymongo.asynchronous.database import AsyncDatabase
db_config: Mapping[str, Any] = config_get("database")
@@ -21,11 +23,18 @@ else:
)
# Async declarations
db_client = AsyncClient(con_string)
db_client = AsyncMongoClient(con_string, connectTimeoutMS=3000)
db: AsyncDatabase = db_client.get_database(name=db_config["name"])
col_users: AsyncCollection = db.get_collection("users")
col_guilds: AsyncCollection = db.get_collection("guilds")
col_wallets: AsyncCollection = db.get_collection("wallets")
col_consents: AsyncCollection = db.get_collection("consents")
col_analytics: AsyncCollection = db.get_collection("analytics")
col_custom_channels: AsyncCollection = db.get_collection("custom_channels")
col_scheduled_actions: AsyncCollection = db.get_collection("scheduled_actions")
# col_messages: AsyncCollection = db.get_collection("messages")
# col_warnings: AsyncCollection = db.get_collection("warnings")
# col_checkouts: AsyncCollection = db.get_collection("checkouts")
@@ -33,6 +42,17 @@ col_wallets: AsyncCollection = db.get_collection("wallets")
# col_authorized: AsyncCollection = db.get_collection("authorized")
# col_transactions: AsyncCollection = db.get_collection("transactions")
# Update indexes
db.dispatch.get_collection("users").create_index("id", unique=True)
db.dispatch.get_collection("wallets").create_index(["owner_id", "guild_id"], unique=False)
async def _update_database_indexes() -> None:
await col_users.create_index(["id", "guild_id"], name="user_id-guild_id", unique=True)
await col_guilds.create_index("guild_id", name="guild_id", unique=True)
await col_wallets.create_index(
["owner_id", "guild_id"], name="owner_id-guild_id", unique=True
)
await col_consents.create_index(
["owner_id", "guild_id", "scope"], name="owner_id-guild_id-scope", unique=False
)
await col_custom_channels.create_index(
["owner_id", "guild_id", "channel_id"], name="owner_id-guild_id-channel_id", unique=True
)

View File

@@ -0,0 +1 @@
from .data_control import user_consent_required

View File

@@ -0,0 +1,35 @@
from logging import Logger
from typing import Callable, List
from discord import ApplicationContext
from discord.ext import commands
from discord.ext.commands import CheckFailure, Context
from discord.ext.commands.context import T
from javelina.classes import PycordUser
from javelina.enums import ConsentScope
from javelina.modules.utils import get_logger
logger: Logger = get_logger(__name__)
def user_consent_required(*required_scopes: ConsentScope) -> Callable[[T], T]:
async def predicate(ctx: Context | ApplicationContext) -> True:
user: PycordUser = await ctx.bot.find_user(ctx.author.id, ctx.guild.id)
for required_scope in required_scopes:
if not await user.has_active_consent(required_scope):
localized_scopes: List[str] = [
ctx.bot._("name", "data_control", "scopes", scope.value, locale=ctx.locale)
for scope in required_scopes
]
raise CheckFailure(
ctx.bot._(
"consent_required", "messages", "data_control", locale=ctx.locale
).format(scopes=f"`{'`, `'.join(localized_scopes)}`")
)
return True
return commands.check(predicate)

View File

@@ -16,6 +16,7 @@ def migrate_database() -> None:
"mongo_database": db_config["name"],
"mongo_username": db_config["user"],
"mongo_password": db_config["password"],
"mongo_migrations_path": "javelina/migrations",
}
)
manager = MigrationManager(manager_config)

View File

@@ -0,0 +1,3 @@
from .cache_utils import restore_from_cache
from .logging_utils import get_logger, get_logging_config
from .router_loader import run_router_setups

View File

@@ -0,0 +1,10 @@
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
def restore_from_cache(
cache_prefix: str, cache_key: str | int | ObjectId, cache: Optional[Cache] = None
) -> Dict[str, Any] | None:
return None if cache is None else cache.get_json(f"{cache_prefix}_{cache_key}")

View File

@@ -0,0 +1,35 @@
import logging
from logging import Logger
from pathlib import Path
from typing import Any, Dict
from libbot.utils import config_get
def get_logging_config() -> Dict[str, Any]:
return {
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": str(Path("logs/latest.log")),
"maxBytes": 500000,
"backupCount": 10,
"formatter": "simple",
},
"console": {"class": "logging.StreamHandler", "formatter": "systemd"},
},
"formatters": {
"simple": {"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"},
"systemd": {"format": "%(name)s - %(levelname)s - %(message)s"},
},
"root": {
"level": "DEBUG" if config_get("debug") else "INFO",
"handlers": ["file", "console"],
},
}
def get_logger(name: str) -> Logger:
return logging.getLogger(name)

View File

@@ -0,0 +1,19 @@
import importlib
from pkgutil import iter_modules
from types import ModuleType
from fastapi import FastAPI
def run_router_setups(app: FastAPI, package: str) -> None:
package_path: str = package.replace(".", "/")
for _, name, is_package in iter_modules([package_path]):
full_module: str = f"{package}.{name}"
module: ModuleType = importlib.import_module(full_module)
if hasattr(module, "setup"):
module.setup(app)
if is_package:
run_router_setups(app, full_module)

View File

@@ -1,17 +1,17 @@
from datetime import datetime
import logging
from datetime import datetime
from typing import Any, Dict, List
from database import col_messages
from discord import Embed
from pymongo import DESCENDING
from pytz import timezone
from classes.enums import MessageEvents
from classes.pycordbot import PycordBot
from ujson import loads
from modules.utils import hex_to_int
from database import col_messages
from modules.weather.parser import parse_weather
from javelina.classes.pycordbot import PycordBot
from javelina.enums import MessageEvents
from javelina.modules.utils import hex_to_int
from javelina.modules.weather.parser import parse_weather
# Example guild key
# "forecast": {
@@ -71,9 +71,9 @@ async def report_weather(
# Results must be parsed and added as embeds to the embeds lits.
for location in locations:
location_timezone_offset = ":".join(
str(
timezone(bot.config["bot"]["timezone"]).utcoffset(datetime.utcnow())
).split(":")[:2]
str(timezone(bot.config["bot"]["timezone"]).utcoffset(datetime.utcnow())).split(
":"
)[:2]
)
api_response = await (

View File

@@ -1,4 +1,73 @@
{
"messages": {
"general": {
"permission_denied": "You're not allowed to perform this operation.",
"permission_denied_title": "Permission denied",
"error_title": "Something went wrong",
"success_title": "Success",
"warning_title": "Warning"
},
"admin": {
"config_reload_success": "Bot's configuration has been reloaded.",
"config_reload_error": "Could not reload the bot's configuration:\n```\n{error}\n```"
},
"data_control": {
"consent_required": "Consent to following scope(s) is required to access the command: {scopes}\n\nYou can use following commands to manage your consents:\n`/consent terms <scope>` - Review the terms for a consent\n`/consent give <scope>` - Give consent for a scope"
},
"wallet": {
"balance_title": "Balance",
"balance_own": "Your balance is `{balance}`.",
"balance_user": "**{user}**'s balance is `{balance}`.",
"transfer_success": "You have transferred `{amount}` to **{recipient}**.",
"transfer_insufficient_funds": "Insufficient funds. `{amount}` more is needed for this transaction."
},
"reload": {
"reload_success": "Configuration has been successfully reloaded.",
"reload_failure": "Could not reload the configuration:\n```\n{exception}\n```"
},
"welcome": {
"morning": [
"{0} Добрий ранок та ласкаво просимо! {1}"
],
"midday": [
"{0} Добрий день! Ласкаво просимо! {1}"
],
"evening": [
"{0} Добрий вечір! Ласкаво просимо! {1}"
],
"night": [
"{0} Доброї ночі! Ласкаво просимо! {1}"
],
"unknown": [
"{0} Вітаннячко! Ласкаво просимо! {1}"
]
}
},
"data_control": {
"scopes": {
"general": {
"name": "General",
"description": ""
},
"integration_deepl": {
"name": "Integration: DeepL",
"description": ""
}
},
"consent_durations": {
"short": "Short",
"normal": "Normal",
"long": "Long"
}
},
"commands": {
"reload": {
"description": "Reload bot's configuration"
},
"clear": {
"description": "Delete some messages in the current channel"
}
},
"actions": {
"bite": {
"name": "Вкусити",
@@ -33,46 +102,25 @@
"text": "**{user_name}** підморгує **{target_name}**"
}
},
"messages": {
"welcome": {
"morning": [
"{0} Добрий ранок та ласкаво просимо! {1}",
"{0} Доброго ранку та ласкаво просимо! {1}",
"{0} Вітаннячко! Ласкаво просимо! {1}",
"{0} Доброго ранку! Ласкаво просимо! {1}"
],
"midday": [
"{0} Добрий день! Ласкаво просимо! {1}",
"{0} Добридень! Ласкаво просимо! {1}",
"{0} День добрий! Ласкаво просимо! {1}",
"{0} Мої вітання! Ласкаво просимо! {1}",
"{0} Здоровенькі були! Ласкаво просимо! {1}",
"{0} Раді вітати вас! Ласкаво просимо! {1}",
"{0} Доброго здоров’ячка! Ласкаво просимо! {1}"
],
"evening": [
"{0} Добрий вечір! Ласкаво просимо! {1}",
"{0} Доброго вечора! Ласкаво просимо! {1}",
"{0} Добривечір! Ласкаво просимо! {1}",
"{0} Доброго вечора та ласкаво просимо! {1}",
"{0} Добрий вечір та ласкаво просимо! {1}"
],
"night": [
"{0} Доброї ночі! Ласкаво просимо! {1}",
"{0} Здоровенькі були! Ласкаво просимо! {1}"
],
"unknown": [
"{0} Вітаннячко! Ласкаво просимо! {1}"
]
}
},
"tracking": {
"dhl": {
"statuses": {
"delivered": "Доставлено",
"transit": "Транзит",
"pre-transit": "Пре-транзит",
"failure": "Невдача"
"AA": "Departed from the hub",
"AE": "Pickup successful",
"AN": "Pickup not successful",
"BV": "Exception occurred",
"DD": "Data service",
"EE": "Arrived at the hub",
"ES": "First processed by DHL",
"GT": "Money transfer",
"LA": "In storage",
"NB": "Processing during transit",
"PO": "In delivery",
"VA": "Electronic pre-advise",
"ZF": "Delivery",
"ZN": "Delivery not successful",
"ZO": "Customs clearance",
"ZU": "Delivery successful"
},
"messages": {
"DHL PAKET (parcel)": "DHL PAKET (посилка)",

View File

@@ -1,43 +1,73 @@
{
"messages": {
"general": {
"permission_denied": "You're not allowed to perform this operation.",
"permission_denied_title": "Permission denied",
"error_title": "Something went wrong",
"success_title": "Success",
"warning_title": "Warning"
},
"admin": {
"config_reload_success": "Bot's configuration has been reloaded.",
"config_reload_error": "Could not reload the bot's configuration:\n```\n{error}\n```"
},
"data_control": {
"consent_required": "Consent to following scope(s) is required to access the command: {scopes}\n\nYou can use following commands to manage your consents:\n`/consent terms <scope>` - Review the terms for a consent\n`/consent give <scope>` - Give consent for a scope"
},
"wallet": {
"balance_title": "Balance",
"balance_own": "Your balance is `{balance}`.",
"balance_user": "**{user}**'s balance is `{balance}`.",
"transfer_success": "You have transferred `{amount}` to **{recipient}**.",
"transfer_insufficient_funds": "Insufficient funds. `{amount}` more is needed for this transaction."
},
"reload": {
"reload_success": "Configuration has been successfully reloaded.",
"reload_failure": "Could not reload the configuration:\n```\n{exception}\n```"
},
"welcome": {
"morning": [
"{0} Good morning and welcome! {1}",
"{0} Доброго ранку та ласкаво просимо! {1}",
"{0} Вітаннячко! Ласкаво просимо! {1}",
"{0} Доброго ранку! Ласкаво просимо! {1}"
"{0} Good morning and welcome! {1}"
],
"midday": [
"{0} Good day and welcome! {1}",
"{0} Добридень! Ласкаво просимо! {1}",
"{0} День добрий! Ласкаво просимо! {1}",
"{0} Мої вітання! Ласкаво просимо! {1}",
"{0} Здоровенькі були! Ласкаво просимо! {1}",
"{0} Раді вітати вас! Ласкаво просимо! {1}",
"{0} Доброго здоров’ячка! Ласкаво просимо! {1}"
"{0} Good day and welcome! {1}"
],
"evening": [
"{0} Good evening and welcome! {1}",
"{0} Доброго вечора! Ласкаво просимо! {1}",
"{0} Добривечір! Ласкаво просимо! {1}",
"{0} Доброго вечора та ласкаво просимо! {1}",
"{0} Добрий вечір та ласкаво просимо! {1}"
"{0} Good evening and welcome! {1}"
],
"night": [
"{0} Good night and welcome! {1}",
"{0} Здоровенькі були! Ласкаво просимо! {1}"
"{0} Good night and welcome! {1}"
],
"unknown": [
"{0} Hello and welcome! {1}"
]
}
},
"data_control": {
"scopes": {
"general": {
"name": "General",
"description": ""
},
"integration_deepl": {
"name": "Integration: DeepL",
"description": ""
}
},
"consent_durations": {
"short": "Short",
"normal": "Normal",
"long": "Long"
}
},
"commands": {
"reload": {
"description": "Reload bot's configuration"
},
"clear": {
"description": "Delete some messages in the current channel"
}
},
"actions": {
"bite": {
"name": "Bite",
@@ -75,10 +105,22 @@
"tracking": {
"dhl": {
"statuses": {
"delivered": "Delivered",
"transit": "Transit",
"pre-transit": "Pre-transit",
"failure": "Failure"
"AA": "Departed from the hub",
"AE": "Pickup successful",
"AN": "Pickup not successful",
"BV": "Exception occurred",
"DD": "Data service",
"EE": "Arrived at the hub",
"ES": "First processed by DHL",
"GT": "Money transfer",
"LA": "In storage",
"NB": "Processing during transit",
"PO": "In delivery",
"VA": "Electronic pre-advise",
"ZF": "Delivery",
"ZN": "Delivery not successful",
"ZO": "Customs clearance",
"ZU": "Delivery successful"
},
"messages": {
"DHL PAKET (parcel)": "DHL PAKET (parcel)",

View File

@@ -1,11 +1,30 @@
{
"messages": {
"general": {
"permission_denied": "Ви не маєте права виконувати цю операцію.",
"permission_denied_title": "Permission denied",
"error_title": "Something went wrong",
"success_title": "Success",
"warning_title": "Warning"
},
"admin": {
"config_reload_success": "Конфігурацію бота було перезавантажено.",
"config_reload_error": "Не вдалося перезавантажити конфігурацію бота:\n```\n{error}\n```"
},
"data_control": {
"consent_required": "Для доступу до команди необхідна згода на наступні сфери застосування: {scopes}\n\nВи можете використовувати наступні команди для управління своїми згодами:\n`/consent terms <scope>` - Переглянути умови згоди\n`/consent give <scope>` - Надати згоду на певну сферу застосування"
},
"wallet": {
"balance_title": "Balance",
"balance_own": "Ваш баланс складає `{balance}`.",
"balance_user": "Баланс **{user}** складає `{balance}`.",
"transfer_success": "Ви перевели `{amount}` на рахунок **{recipient}**.",
"transfer_insufficient_funds": "Недостатньо коштів. Потрібно ще `{amount}` для цієї транзакції."
},
"reload": {
"reload_success": "Конфігурацію було успішно перезавантажено.",
"reload_failure": "Не вдалось перезавантажити конфігурацію:\n```\n{exception}\n```"
},
"welcome": {
"morning": [
"{0} Добрий ранок та ласкаво просимо! {1}",
@@ -38,6 +57,31 @@
]
}
},
"data_control": {
"scopes": {
"general": {
"name": "Загальне",
"description": ""
},
"integration_deepl": {
"name": "Інтеграція: DeepL",
"description": ""
}
},
"consent_durations": {
"short": "Короткий",
"normal": "Звичаний",
"long": "Довгий"
}
},
"commands": {
"reload": {
"description": "Перезавантажити конфігурацію бота"
},
"clear": {
"description": "Delete some messages in the current channel"
}
},
"actions": {
"bite": {
"name": "Вкусити",
@@ -75,10 +119,22 @@
"tracking": {
"dhl": {
"statuses": {
"delivered": "Доставлено",
"transit": "Транзит",
"pre-transit": "Пре-транзит",
"failure": "Невдача"
"AA": "Departed from the hub",
"AE": "Pickup successful",
"AN": "Pickup not successful",
"BV": "Exception occurred",
"DD": "Data service",
"EE": "Arrived at the hub",
"ES": "First processed by DHL",
"GT": "Money transfer",
"LA": "In storage",
"NB": "Processing during transit",
"PO": "In delivery",
"VA": "Electronic pre-advise",
"ZF": "Delivery",
"ZN": "Delivery not successful",
"ZO": "Customs clearance",
"ZU": "Delivery successful"
},
"messages": {
"DHL PAKET (parcel)": "DHL PAKET (посилка)",

44
main.py
View File

@@ -1,24 +1,26 @@
import asyncio
import contextlib
import logging
import logging.config
from asyncio import get_event_loop
from logging import Logger
from os import getpid
from os import getpid, makedirs
from pathlib import Path
from sys import exit
from discord import LoginFailure
from libbot.utils import config_get
# Import required for uvicorn
from api.app import app # noqa
from classes.pycord_bot import PycordBot
from modules.extensions_loader import dynamic_import_from_src
from modules.scheduler import scheduler
from javelina.api.app import app # noqa
from javelina.classes.pycord_bot import PycordBot
from javelina.modules.scheduler import scheduler
from javelina.modules.utils import get_logger, get_logging_config
logging.basicConfig(
level=logging.DEBUG if config_get("debug") else logging.INFO,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]",
)
makedirs(Path("logs/"), exist_ok=True)
logger: Logger = logging.getLogger(__name__)
logging.config.dictConfig(get_logging_config())
logger: Logger = get_logger(__name__)
# Try to import the module that improves performance
# and ignore errors when module is not installed
@@ -28,19 +30,27 @@ with contextlib.suppress(ImportError):
uvloop.install()
async def main():
bot = PycordBot(scheduler=scheduler)
async def main() -> None:
bot: PycordBot = PycordBot(scheduler=scheduler)
bot.load_extension("cogs")
bot.load_extension("javelina.cogs")
# Import API modules
dynamic_import_from_src("api/extensions", star_import=True)
app.set_bot(bot)
try:
await bot.start(config_get("bot_token", "bot"))
except LoginFailure as exc:
logger.error("Provided bot token is invalid: %s", exc)
except KeyboardInterrupt:
logger.warning("Forcefully shutting down with PID %s...", getpid())
await bot.close()
except Exception as exc:
logger.error("An unexpected error has occurred: %s", exc, exc_info=exc)
exit(1)
asyncio.create_task(main())
if __name__ == "__main__":
event_loop = get_event_loop()
event_loop.run_until_complete(main())

View File

@@ -1,3 +0,0 @@
from modules.migrator import migrate_database
migrate_database()

View File

@@ -1,74 +0,0 @@
import logging
from importlib.util import module_from_spec, spec_from_file_location
from os import getcwd, walk
from pathlib import Path
from types import ModuleType
from typing import List
logger = logging.getLogger(__name__)
# Import functions
# Took from https://stackoverflow.com/a/57892961
def get_py_files(src: str | Path) -> List[str]:
cwd = getcwd() # Current Working directory
py_files = []
for root, dirs, files in walk(src):
py_files.extend(Path(f"{cwd}/{root}/{file}") for file in files if file.endswith(".py"))
return py_files
def dynamic_import(module_name: str, py_path: str) -> ModuleType | None:
try:
module_spec = spec_from_file_location(module_name, py_path)
if module_spec is None:
raise RuntimeError(
f"Module spec from module name {module_name} and path {py_path} is None"
)
module = module_from_spec(module_spec)
if module_spec.loader is None:
logger.warning(
"Could not load extension %s due to spec loader being None.",
module_name,
)
return None
module_spec.loader.exec_module(module)
return module
except SyntaxError:
logger.warning(
"Could not load extension %s due to invalid syntax. Check logs/errors.log for details.",
module_name,
)
return None
except Exception as exc:
logger.warning("Could not load extension %s due to %s", module_name, exc)
return None
def dynamic_import_from_src(src: str | Path, star_import=False) -> None:
my_py_files = get_py_files(src)
for py_file in my_py_files:
module_name = Path(py_file).stem
logger.debug("Importing %s extension...", module_name)
imported_module = dynamic_import(module_name, py_file)
if imported_module is not None:
if star_import:
for obj in dir(imported_module):
globals()[obj] = imported_module.__dict__[obj]
else:
globals()[module_name] = imported_module
logger.info("Successfully loaded %s extension", module_name)
return

View File

@@ -1,6 +1,38 @@
[project]
name = "Javelina"
version = "0.0.1"
dynamic = ["dependencies", "optional-dependencies"]
description = "Discord bot that manages the server and provides an additional RESTful API"
readme = "README.md"
license = "AGPL-3.0"
license-files = ["LICENSE"]
requires-python = ">=3.11"
[tool.setuptools.dynamic]
dependencies = { file = "requirements.txt" }
[tool.setuptools.dynamic.optional-dependencies]
dev = { file = "requirements/dev.txt" }
[tool.setuptools.packages.find]
where = ["javelina"]
[tool.black]
line-length = 96
target-version = ["py311"]
target-version = ["py311", "py312", "py313"]
[tool.isort]
profile = "black"
profile = "black"
[tool.mypy]
namespace_packages = true
install_types = true
strict = true
show_error_codes = true
[tool.pylint]
disable = ["line-too-long"]
[tool.pylint.main]
extension-pkg-whitelist = ["ujson"]
py-version = 3.11

View File

@@ -1,11 +1,16 @@
aiohttp>=3.6.0
aiohttp>=3.9.2
apscheduler~=3.11.0
async_pymongo==0.1.11
colorthief==0.2.1
deepl==1.22.0
fastapi[all]~=0.115.0
libbot[speed,pycord,cache]==4.2.0
fastapi[all]~=0.115.5,<0.116.0
fastapi_discord==0.2.7
libbot[speed,pycord,cache]==4.4.0
mongodb-migrations==1.3.1
pynacl~=1.5.0
pyrmv==0.5.0
pytz~=2025.1
pytz~=2025.1
tempora~=5.8.1
typer~=0.17.1
# Temporarily disabled because
# these are still unused for now
# colorthief==0.2.1
# deepl==1.22.0
# pyrmv==0.5.0

4
requirements/dev.txt Normal file
View File

@@ -0,0 +1,4 @@
black==25.1.0
isort==5.13.2
mypy==1.16.1
pylint==3.3.7

1900
uv.lock generated Normal file

File diff suppressed because it is too large Load Diff