Initial commit with code

This commit is contained in:
2025-04-16 18:39:03 +02:00
parent fcaa3263cb
commit cf0d4bd18f
24 changed files with 576 additions and 18 deletions

24
.gitignore vendored
View File

@@ -95,12 +95,6 @@ ipython_config.py
# install all needed dependencies. # install all needed dependencies.
#Pipfile.lock #Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry # poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. # Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more # This is especially recommended for binary packages to ensure reproducibility, and is more
@@ -108,17 +102,7 @@ ipython_config.py
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control # https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock #poetry.lock
# pdm # PEP 582; used by e.g. github.com/David-OConnor/pyflow
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/ __pypackages__/
# Celery stuff # Celery stuff
@@ -162,9 +146,13 @@ dmypy.json
cython_debug/ cython_debug/
# PyCharm # PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can # JetBrains specific template is maintainted in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear # and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/ #.idea/
# Custom
.vscode
config.json
logs/

21
.renovaterc Normal file
View File

@@ -0,0 +1,21 @@
{
"$schema": "https://docs.renovatebot.com/renovate-schema.json",
"extends": [
"config:base"
],
"baseBranches": [
"dev"
],
"packageRules": [
{
"matchUpdateTypes": [
"minor",
"patch",
"pin",
"digest"
],
"automerge": true
}
]
}

2
classes/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
from .pycord_guild import PycordGuild
from .pycord_user import PycordUser

View File

View File

@@ -0,0 +1,2 @@
from .pycord_guild import GuildNotFoundError
from .pycord_user import UserNotFoundError

View File

@@ -0,0 +1,7 @@
class GuildNotFoundError(Exception):
"""PycordGuild could not find guild with such an ID in the database"""
def __init__(self, guild_id: int) -> None:
self.guild_id = guild_id
super().__init__(f"Guild with id {self.guild_id} was not found")

View File

@@ -0,0 +1,7 @@
class UserNotFoundError(Exception):
"""PycordUser could not find user with such an ID in the database"""
def __init__(self, user_id: int) -> None:
self.user_id = user_id
super().__init__(f"User with id {self.user_id} was not found")

71
classes/pycord_bot.py Normal file
View File

@@ -0,0 +1,71 @@
import logging
from logging import Logger
from typing import Any
from aiohttp import ClientSession
from discord import User
from libbot.cache.manager import create_cache_client
from libbot.pycord.classes import PycordBot as LibPycordBot
from classes import PycordUser
logger: Logger = logging.getLogger(__name__)
# from modules.tracking.dhl import update_tracks_dhl
class PycordBot(LibPycordBot):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._set_cache_engine()
self.client_session = ClientSession()
if self.scheduler is None:
return
# This replacement exists because of the different
# i18n formats than provided by libbot
self._ = self._modified_string_getter
def _modified_string_getter(self, key: str, *args: str, locale: str | None = None) -> Any:
"""This method exists because of the different i18n formats than provided by libbot.
It splits "-" and takes the first part of the provided locale to make complex language codes
compatible with an easy libbot approach to i18n.
"""
return self.bot_locale._(key, *args, locale=None if locale is None else locale.split("-")[0])
def _set_cache_engine(self) -> None:
if "cache" in self.config and self.config["cache"]["type"] is not None:
self.cache = create_cache_client(self.config, self.config["cache"]["type"])
async def find_user(self, user: int | User) -> PycordUser:
"""Find User by its ID or User object.
Args:
user (int | User): ID or User object to extract ID from
Returns:
PycordUser: User object
Raises:
UserNotFoundException: User was not found and creation was not allowed
"""
return (
await PycordUser.from_id(user, cache=self.cache)
if isinstance(user, int)
else await PycordUser.from_id(user.id, cache=self.cache)
)
async def start(self, *args: Any, **kwargs: Any) -> None:
await super().start(*args, **kwargs)
async def close(self, **kwargs) -> None:
await self.client_session.close()
if self.scheduler is not None:
self.scheduler.shutdown()
await super().close(**kwargs)

47
classes/pycord_guild.py Normal file
View File

@@ -0,0 +1,47 @@
from dataclasses import dataclass
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
@dataclass
class PycordGuild:
_id: ObjectId
id: int
def __init__(self) -> None:
raise NotImplementedError()
@classmethod
async def from_id(
cls, guild_id: int, allow_creation: bool = True, cache: Optional[Cache] = None
) -> "PycordGuild":
"""Find guild in database and create new record if guild does not exist.
Args:
guild_id (int): User's Discord ID
allow_creation (:obj:`bool`, optional): Create new guild record if none found in the database
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
PycordGuild: User object
Raises:
GuildNotFoundError: User was not found and creation was not allowed
"""
raise NotImplementedError()
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordGuild object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordGuild
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
}

158
classes/pycord_user.py Normal file
View File

@@ -0,0 +1,158 @@
import logging
from dataclasses import dataclass
from logging import Logger
from typing import Any, Dict, Optional
from bson import ObjectId
from libbot.cache.classes import Cache
from pymongo.results import InsertOneResult
from classes.errors.pycord_user import UserNotFoundError
from modules.database import col_users
logger: Logger = logging.getLogger(__name__)
@dataclass
class PycordUser:
"""Dataclass of DB entry of a user"""
__slots__ = ("_id", "id")
_id: ObjectId
id: int
@classmethod
async def from_id(
cls, user_id: int, allow_creation: bool = True, cache: Optional[Cache] = None
) -> "PycordUser":
"""Find user in database and create new record if user does not exist.
Args:
user_id (int): User's Discord ID
allow_creation (:obj:`bool`, optional): Create new user record if none found in the database
cache (:obj:`Cache`, optional): Cache engine to get the cache from
Returns:
PycordUser: User object
Raises:
UserNotFoundError: User was not found and creation was not allowed
"""
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user_id}")
if cached_entry is not None:
return cls(**cached_entry)
db_entry = await col_users.find_one({"id": user_id})
if db_entry is None:
if not allow_creation:
raise UserNotFoundError(user_id)
db_entry = PycordUser.get_defaults(user_id)
insert_result: InsertOneResult = await col_users.insert_one(db_entry)
db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(f"user_{user_id}", db_entry)
return cls(**db_entry)
def to_dict(self, json_compatible: bool = False) -> Dict[str, Any]:
"""Convert PycordUser object to a JSON representation.
Args:
json_compatible (bool): Whether the JSON-incompatible objects like ObjectId need to be converted
Returns:
Dict[str, Any]: JSON representation of PycordUser
"""
return {
"_id": self._id if not json_compatible else str(self._id),
"id": self.id,
}
async def _set(self, key: str, value: Any, cache: Optional[Cache] = None) -> None:
"""Set attribute data and save it into the database.
Args:
key (str): Attribute to change
value (Any): Value to set
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await col_users.update_one({"_id": self._id}, {"$set": {key: value}}, upsert=True)
self._update_cache(cache)
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value)
async def _remove(self, key: str, cache: Optional[Cache] = None) -> None:
"""Remove attribute data and save it into the database.
Args:
key (str): Attribute to remove
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = PycordUser.get_default_value(key)
setattr(self, key, default_value)
await col_users.update_one({"_id": self._id}, {"$set": {key: default_value}}, upsert=True)
self._update_cache(cache)
logger.info("Removed attribute '%s' of user %s", key, self.id)
def _get_cache_key(self) -> str:
return f"user_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self.to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
@staticmethod
def get_defaults(user_id: Optional[int] = None) -> Dict[str, Any]:
return {
"id": user_id,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in PycordUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in PycordUser")
return PycordUser.get_defaults()[key]
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove user data from database. Currently only removes the user record from users collection.
Args:
cache (:obj:`Cache`, optional): Cache engine to write the update into
"""
await col_users.delete_one({"_id": self._id})
self._delete_cache(cache)

43
cli.py Normal file
View File

@@ -0,0 +1,43 @@
import logging
from argparse import ArgumentParser
from modules.migrator import migrate_database
# from modules.migrator import downgrade_database
logging.basicConfig(
level=logging.INFO,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]",
)
logger = logging.getLogger(__name__)
parser: ArgumentParser = ArgumentParser(
prog="Javelina",
description="Discord bot for community management.",
)
parser.add_argument("--migrate", action="store_true")
# parser.add_argument("--downgrade", action="store_true")
parser.add_argument("--confirm", action="store_true")
args = parser.parse_args()
if args.migrate:
logger.info("Migrating...")
migrate_database()
exit()
# if args.downgrade:
# if not args.confirm:
# logger.warning(
# "Argument --downgrade has been provided but this operation may include irreversible changes and destroy existing data in both database and configuration. It's not recommended to do this without backing everything up. Retry this command providing an additional argument --confirm to accept the risks and explicitly execute the downgrade."
# )
# exit(1)
#
# logger.info("Downgrading...")
# downgrade_database()
# exit()
logger.info("No arguments have been provided, not doing anything.")

35
config_example.json Normal file
View File

@@ -0,0 +1,35 @@
{
"locale": "en",
"debug": false,
"bot": {
"owners": [
0
],
"debug_guilds": [
0
],
"bot_token": "",
"timezone": "UTC",
"status": {
"enabled": true,
"activity_type": 0,
"activity_text": "The Game Of Life"
}
},
"database": {
"user": null,
"password": null,
"host": "127.0.0.1",
"port": 27017,
"name": "quiz_bot"
},
"cache": {
"type": null,
"memcached": {
"uri": "127.0.0.1:11211"
},
"redis": {
"uri": "redis://127.0.0.1:6379/0"
}
}
}

3
locale/en.json Normal file
View File

@@ -0,0 +1,3 @@
{
"messages": {}
}

3
locale/uk.json Normal file
View File

@@ -0,0 +1,3 @@
{
"messages": {}
}

42
main.py Normal file
View File

@@ -0,0 +1,42 @@
import asyncio
import contextlib
import logging.config
from logging import Logger
from os import getpid, makedirs
from pathlib import Path
# Import required for uvicorn
from libbot.utils import config_get
from classes.pycord_bot import PycordBot
from modules.logging_utils import get_logging_config, get_logger
from modules.scheduler import scheduler
makedirs(Path("logs/"), exist_ok=True)
logging.config.dictConfig(get_logging_config())
logger: Logger = get_logger(__name__)
# Try to import the module that improves performance
# and ignore errors when module is not installed
with contextlib.suppress(ImportError):
import uvloop
uvloop.install()
async def main():
bot = PycordBot(scheduler=scheduler)
bot.load_extension("cogs")
try:
await bot.start(config_get("bot_token", "bot"))
except KeyboardInterrupt:
logger.warning("Forcefully shutting down with PID %s...", getpid())
await bot.close()
if __name__ == "__main__":
asyncio.create_task(main())

3
migrate.py Normal file
View File

@@ -0,0 +1,3 @@
from modules.migrator import migrate_database
migrate_database()

0
migrations/.gitkeep Normal file
View File

30
modules/database.py Normal file
View File

@@ -0,0 +1,30 @@
"""Module that provides all database collections"""
from typing import Any, Mapping
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase
from libbot.utils import config_get
db_config: Mapping[str, Any] = config_get("database")
if db_config["user"] is not None and db_config["password"] is not None:
con_string = "mongodb://{0}:{1}@{2}:{3}/{4}".format(
db_config["user"],
db_config["password"],
db_config["host"],
db_config["port"],
db_config["name"],
)
else:
con_string = "mongodb://{0}:{1}/{2}".format(db_config["host"], db_config["port"], db_config["name"])
# Async declarations
db_client = AsyncClient(con_string)
db: AsyncDatabase = db_client.get_database(name=db_config["name"])
col_users: AsyncCollection = db.get_collection("users")
col_wallets: AsyncCollection = db.get_collection("wallets")
# Update indexes
db.dispatch.get_collection("users").create_index("id", unique=True)
db.dispatch.get_collection("wallets").create_index(["owner_id", "guild_id"], unique=False)

35
modules/logging_utils.py Normal file
View File

@@ -0,0 +1,35 @@
import logging
from logging import Logger
from pathlib import Path
from typing import Any, Dict
from libbot.utils import config_get
def get_logging_config() -> Dict[str, Any]:
return {
"version": 1,
"disable_existing_loggers": False,
"handlers": {
"file": {
"class": "logging.handlers.RotatingFileHandler",
"filename": str(Path("logs/latest.log")),
"maxBytes": 500000,
"backupCount": 10,
"formatter": "simple",
},
"console": {"class": "logging.StreamHandler", "formatter": "systemd"},
},
"formatters": {
"simple": {"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"},
"systemd": {"format": "%(name)s - %(levelname)s - %(message)s"},
},
"root": {
"level": "DEBUG" if config_get("debug") else "INFO",
"handlers": ["file", "console"],
},
}
def get_logger(name: str) -> Logger:
return logging.getLogger(name)

22
modules/migrator.py Normal file
View File

@@ -0,0 +1,22 @@
from typing import Any, Mapping
from libbot.utils import config_get
from mongodb_migrations.cli import MigrationManager
from mongodb_migrations.config import Configuration
def migrate_database() -> None:
"""Apply migrations from folder `migrations/` to the database"""
db_config: Mapping[str, Any] = config_get("database")
manager_config = Configuration(
{
"mongo_host": db_config["host"],
"mongo_port": db_config["port"],
"mongo_database": db_config["name"],
"mongo_username": db_config["user"],
"mongo_password": db_config["password"],
}
)
manager = MigrationManager(manager_config)
manager.run()

3
modules/scheduler.py Normal file
View File

@@ -0,0 +1,3 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()

6
modules/utils.py Normal file
View File

@@ -0,0 +1,6 @@
def hex_to_int(hex_color: str) -> int:
return int(hex_color.lstrip("#"), 16)
def int_to_hex(integer_color: int) -> str:
return "#" + format(integer_color, "06x")

25
pyproject.toml Normal file
View File

@@ -0,0 +1,25 @@
[project]
name = "QuizBot"
authors = [{ name = "Profitroll" }]
readme = "README.md"
requires-python = ">=3.11"
[tool.black]
line-length = 108
target-version = ["py311", "py312", "py313"]
[tool.isort]
profile = "black"
[tool.mypy]
namespace_packages = true
install_types = true
strict = true
show_error_codes = true
[tool.pylint]
disable = ["line-too-long"]
[tool.pylint.main]
extension-pkg-whitelist = ["ujson"]
py-version = 3.11

5
requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
apscheduler~=3.11.0
async_pymongo==0.1.11
libbot[speed,pycord,cache]==4.1.0
mongodb-migrations==1.3.1
pytz~=2025.1