53 Commits

Author SHA1 Message Date
kku
25f2595cf7 Added a note about a debug setting 2025-02-10 10:21:16 +01:00
a4967e5b77 Merge pull request 'Updated README for caching' (#25) from feature/cache into dev
Reviewed-on: #25
2025-02-10 11:17:47 +02:00
kku
0228983d52 Updated README for caching 2025-02-10 10:15:28 +01:00
39a90d39fd Merge pull request 'Moved create_cache_client() to cache_manager' (#24) from feature/cache into dev
Reviewed-on: #24
2025-02-10 00:45:37 +02:00
9e9b6bc7dc Moved create_cache_client() to cache_manager 2025-02-09 23:45:18 +01:00
d19a7381f3 Merge pull request 'Basic caching support' (#23) from feature/cache into dev
Reviewed-on: #23
2025-02-10 00:41:34 +02:00
cda570eb37 Changed default URI for Redis 2025-02-09 23:40:09 +01:00
a54ef39e53 Added basic Redis support 2025-02-09 23:37:44 +01:00
b3a7e3623a Implemented memcached caching 2025-02-09 23:00:18 +01:00
d402c520a5 Merge pull request 'Update dependency libbot to v4.0.2' (#22) from renovate/libbot-4.x into dev
Reviewed-on: #22
2025-01-02 23:49:39 +02:00
751662ba6b Update dependency libbot to v4.0.2 2025-01-02 15:22:32 +02:00
09da774f26 Merge pull request 'Update dependency libbot to v4.0.1' (#21) from renovate/libbot-4.x into dev
Reviewed-on: #21
2024-12-29 18:55:38 +02:00
kku
f97e6e4e93 Removed now unused typing-extensions 2024-12-29 17:54:40 +01:00
8f73cab327 Update dependency libbot to v4.0.1 2024-12-29 18:09:02 +02:00
kku
cd9e4187f7 Closes #17; Fixed migrations 2024-12-27 23:00:27 +01:00
kku
4f6c99f211 WIP: Database migrations 2024-12-27 22:43:40 +01:00
kku
eb8019ccfe Updated the documentation 2024-12-27 22:33:58 +01:00
kku
ce57755eee Improved type-hinting for loggers and removed legacy 2024-12-27 22:23:41 +01:00
kku
7a64e334d2 Added logging for thread creation message deletion 2024-12-27 20:42:12 +01:00
kku
9417951f55 Removed legacy and improved documentation 2024-12-27 20:30:32 +01:00
kku
6060a3df83 Closes #18 2024-12-27 20:16:30 +01:00
eed084cd91 Replaced legacy Union[] with new syntax 2024-12-27 00:18:54 +01:00
7b64f6938b Added a nice comment explaining the sync call inside async function 2024-12-26 20:28:44 +01:00
c54586940e Added a fix in case typing extensions is missing 2024-12-26 20:28:06 +01:00
0195706e92 PycordBot now handles scheduler on its own 2024-12-26 19:18:02 +01:00
162898f5eb WIP: libbot 4.0.0 adoption 2024-12-26 19:12:50 +01:00
a753918432 Added experimental Docker instructions 2024-12-17 22:29:03 +01:00
36d63e0240 Changed the Client structure 2024-12-17 22:14:06 +01:00
62a36a3747 Merge pull request 'Update dependency libbot to v3.3.1' (#16) from renovate/libbot-3.x into dev
Reviewed-on: #16
2024-12-17 00:03:17 +02:00
8edf70e21c Update dependency libbot to v3.3.1 2024-12-17 00:00:36 +02:00
kku
ccf7e06e67 Added indirect dependencies for WaifuPicsPython 2024-12-16 21:04:47 +01:00
kku
af04a7dce6 Changed scheduler declaration 2024-12-16 20:59:10 +01:00
kku
ef4e42fff0 Improved error handling 2024-12-16 20:49:58 +01:00
kku
c05cf64ae0 Improved type-hinting and overall sanity checks implemented. 2024-12-16 20:34:37 +01:00
kku
5c763fc02e Moved waifu pics to a separate module and improved shutdown handling. 2024-12-16 20:00:32 +01:00
kku
53d5827ed9 Added a check to make sure the reply message is sent in an existing channel. 2024-12-16 19:54:53 +01:00
kku
454ce2b6fb Working on #14 2024-12-16 16:25:35 +01:00
kku
41112018da Working on #13 2024-12-15 23:36:48 +01:00
kku
982d0bce43 General improvements and refactoring 2024-12-15 23:21:41 +01:00
19d2ef281c Clarified a few points about the configuration 2024-12-15 23:45:57 +02:00
591f427ac9 Added installation instructions and dropped Python 3.8 support 2024-12-15 23:43:33 +02:00
fc97d317ec Merge pull request 'Update dependency apscheduler to v3.11.0' (#12) from renovate/apscheduler-3.x into dev
Reviewed-on: #12
2024-11-24 22:18:01 +02:00
d311c02d45 Update dependency apscheduler to v3.11.0 2024-11-24 21:54:59 +02:00
cac7d6a307 Merge pull request 'Update dependency pymongo to ~=4.10.0' (#11) from renovate/pymongo-4.x into dev
Reviewed-on: #11
2024-10-01 23:12:42 +03:00
46d066e643 Update dependency pymongo to ~=4.10.0 2024-10-01 05:33:19 +03:00
1d7e07a4cc Merge pull request 'Update dependency pymongo to ~=4.9.1' (#10) from renovate/pymongo-4.x into dev
Reviewed-on: #10
2024-09-19 20:51:35 +03:00
8ac9e17284 Update dependency pymongo to ~=4.9.1 2024-09-19 01:42:03 +03:00
79a51d5e93 Merge pull request 'Update dependency libbot to v3.2.3' (#9) from renovate/libbot-3.x into dev
Reviewed-on: #9
2024-07-10 08:12:45 +03:00
1c906c2126 Update dependency libbot to v3.2.3 2024-07-10 00:43:55 +03:00
207dc6cae7 Merge pull request 'Update dependency pymongo to ~=4.8.0' (#8) from renovate/pymongo-4.x into dev
Reviewed-on: #8
2024-06-26 22:16:26 +03:00
63e8cde861 Update dependency pymongo to ~=4.8.0 2024-06-26 22:03:59 +03:00
d4c02ee54b Merge pull request 'Update dependency aiofiles to v24' (#7) from renovate/aiofiles-24.x into dev
Reviewed-on: #7
2024-06-24 18:27:35 +03:00
93ccc7cd69 Update dependency aiofiles to v24 2024-06-24 14:55:18 +03:00
38 changed files with 1512 additions and 661 deletions

40
.dockerignore Normal file
View File

@@ -0,0 +1,40 @@
# Include any files or directories that you don't want to be copied to your
# container here (e.g., local build artifacts, temporary files, etc.).
#
# For more help, visit the .dockerignore file reference guide at
# https://docs.docker.com/go/build-context-dockerignore/
**/.DS_Store
**/__pycache__
**/.venv
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/bin
**/charts
**/docker-compose*
**/compose.y*ml
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
config.json
.renovaterc
**/.idea
**/.mypy_cache
validation

27
Dockerfile Normal file
View File

@@ -0,0 +1,27 @@
ARG PYTHON_VERSION=3.12.8
FROM python:${PYTHON_VERSION}-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
RUN --mount=type=cache,target=/root/.cache/pip \
--mount=type=bind,source=requirements.txt,target=requirements.txt \
python -m pip install -r requirements.txt
USER appuser
COPY . .
ENTRYPOINT ["python", "main.py", "--migrate"]

121
README.md
View File

@@ -1,2 +1,121 @@
# HoloBotDiscord <h1 align="center">HoloBot Discord</h1>
<p align="center">Small Discord bot made on Py-Cord</p>
<p align="center">
<a href="https://git.end-play.xyz/HoloUA/Discord/src/branch/master/LICENSE"><img alt="License: GPL" src="https://img.shields.io/badge/License-GPL-blue"></a>
<a href="https://git.end-play.xyz/HoloUA/Discord"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
</p>
## Installation from release
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Install Python 3.11+
3. Download the [latest release](https://git.end-play.xyz/HoloUA/Discord/releases/latest)'s archive
4. Extract the archive
5. Navigate to the extracted folder and subfolder `Discord` in it
6. Create a virtual environment:
`python -m venv .venv` or `virtualenv .venv`
7. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
8. Install the dependencies:
`python -m pip install -r requirements.txt`
9. Run the bot with `python main.py` after completing the [configuration](#Configuration)
## Installation with Git
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Install Python 3.11+
3. Clone the repository:
`git clone https://git.end-play.xyz/HoloUA/Discord.git`
4. `cd Discord`
5. Create a virtual environment:
`python -m venv .venv` or `virtualenv .venv`
6. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
7. Install the dependencies:
`python -m pip install -r requirements.txt`
8. Run the bot with `python main.py` after completing the [configuration](#Configuration)
## Upgrading with Git
1. Go to the bot's directory
2. `git pull`
3. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
4. Update the dependencies:
`python -m pip install -r requirements.txt`
5. First start after the upgrade must initiate the migration:
`python main.py --migrate`
6. Now the bot is up to date and the next run will not require `--migrate` anymore
## Configuration
There's a file `config_example.json` which contains default configuration
and should be used as a base config.
Copy this file to `config.json` and open it with any text editor of your liking.
Modify the newly created configuration file to fit your needs.
Mandatory keys to modify:
- guild
- bot.owner
- bot.bot_token
- database.*
- categories.*
- channels.*
- roles.*
After all of that you're good to go! Happy using :)
## Caching
Although general database access speed is fast, caching might become helpful for
bigger servers with many bot interactions. Currently, Redis and Memcached are supported.
Configuration happens through the config key `caching`.
Set `caching.type` to the service of you choice ("redis" or "memcached") and then update
the URI to access the service. It's Redis' default URI format for Redis and "address:port"
for Memcached.
Which one should I choose?
| Service | Read/write speed | Config flexibility |
|-----------|------------------|--------------------|
| Redis | High | Very flexible |
| Memcached | Very high | Basic |
> Performance difference between Redis and Memcached is generally quite low, so your setup
> should normally depend more on the configuration flexibility than on raw speed.
## Debugging
There's a config key `debug` that can be set to `true` for debugging purposes.
It should be set to `false` in production, otherwise log becomes very verbose and might
contain data that shouldn't normally be logged.
## Docker [Experimental]
As an experiment, Docker deployment option has been added.
### Building the image
1. `git clone https://git.end-play.xyz/HoloUA/Discord.git`
2. `cd Discord`
3. `docker build -t holoua-discord .`
### Starting the bot
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Download
the [configuration example file](https://git.end-play.xyz/HoloUA/Discord/src/branch/main/config_example.json) and
store it somewhere you would like your bot to access it
3. Complete the [configuration](#Configuration) step for this file
4. `docker run -d -v /path/to/config.json:/app/config.json holoua-discord`

0
classes/__init__.py Normal file
View File

3
classes/cache/__init__.py vendored Normal file
View File

@@ -0,0 +1,3 @@
from .holo_cache import HoloCache
from .holo_cache_memcached import HoloCacheMemcached
from .holo_cache_redis import HoloCacheRedis

44
classes/cache/holo_cache.py vendored Normal file
View File

@@ -0,0 +1,44 @@
from abc import ABC, abstractmethod
from typing import Any, Dict
import pymemcache
import redis
class HoloCache(ABC):
client: pymemcache.Client | redis.Redis
@classmethod
@abstractmethod
def from_config(cls, engine_config: Dict[str, Any]) -> Any:
pass
@abstractmethod
def get_json(self, key: str) -> Any | None:
# TODO This method must also carry out ObjectId conversion!
pass
@abstractmethod
def get_string(self, key: str) -> str | None:
pass
@abstractmethod
def get_object(self, key: str) -> Any | None:
pass
@abstractmethod
def set_json(self, key: str, value: Any) -> None:
# TODO This method must also carry out ObjectId conversion!
pass
@abstractmethod
def set_string(self, key: str, value: str) -> None:
pass
@abstractmethod
def set_object(self, key: str, value: Any) -> None:
pass
@abstractmethod
def delete(self, key: str) -> None:
pass

89
classes/cache/holo_cache_memcached.py vendored Normal file
View File

@@ -0,0 +1,89 @@
import logging
from logging import Logger
from typing import Dict, Any
from pymemcache import Client
from modules.cache_utils import string_to_json, json_to_string
from . import HoloCache
logger: Logger = logging.getLogger(__name__)
class HoloCacheMemcached(HoloCache):
client: Client
def __init__(self, client: Client):
self.client = client
logger.info("Initialized Memcached for caching")
@classmethod
def from_config(cls, engine_config: Dict[str, Any]) -> "HoloCacheMemcached":
if "uri" not in engine_config:
raise KeyError(
"Cache configuration is invalid. Please check if all keys are set (engine: memcached)"
)
return cls(Client(engine_config["uri"], default_noreply=True))
def get_json(self, key: str) -> Any | None:
try:
result: Any | None = self.client.get(key, None)
logger.debug(
"Got json cache key '%s'%s",
key,
"" if result is not None else " (not found)",
)
except Exception as exc:
logger.error("Could not get json cache key '%s' due to: %s", key, exc)
return None
return None if result is None else string_to_json(result)
def get_string(self, key: str) -> str | None:
try:
result: str | None = self.client.get(key, None)
logger.debug(
"Got string cache key '%s'%s",
key,
"" if result is not None else " (not found)",
)
return result
except Exception as exc:
logger.error("Could not get string cache key '%s' due to: %s", key, exc)
return None
# TODO Implement binary deserialization
def get_object(self, key: str) -> Any | None:
raise NotImplementedError()
def set_json(self, key: str, value: Any) -> None:
try:
self.client.set(key, json_to_string(value))
logger.debug("Set json cache key '%s'", key)
except Exception as exc:
logger.error("Could not set json cache key '%s' due to: %s", key, exc)
return None
def set_string(self, key: str, value: str) -> None:
try:
self.client.set(key, value)
logger.debug("Set string cache key '%s'", key)
except Exception as exc:
logger.error("Could not set string cache key '%s' due to: %s", key, exc)
return None
# TODO Implement binary serialization
def set_object(self, key: str, value: Any) -> None:
raise NotImplementedError()
def delete(self, key: str) -> None:
try:
self.client.delete(key)
logger.debug("Deleted cache key '%s'", key)
except Exception as exc:
logger.error("Could not delete cache key '%s' due to: %s", key, exc)

89
classes/cache/holo_cache_redis.py vendored Normal file
View File

@@ -0,0 +1,89 @@
import logging
from logging import Logger
from typing import Dict, Any
from redis import Redis
from classes.cache import HoloCache
from modules.cache_utils import string_to_json, json_to_string
logger: Logger = logging.getLogger(__name__)
class HoloCacheRedis(HoloCache):
client: Redis
def __init__(self, client: Redis):
self.client = client
logger.info("Initialized Redis for caching")
@classmethod
def from_config(cls, engine_config: Dict[str, Any]) -> Any:
if "uri" not in engine_config:
raise KeyError(
"Cache configuration is invalid. Please check if all keys are set (engine: memcached)"
)
return cls(Redis.from_url(engine_config["uri"]))
def get_json(self, key: str) -> Any | None:
try:
result: Any | None = self.client.get(key)
logger.debug(
"Got json cache key '%s'%s",
key,
"" if result is not None else " (not found)",
)
except Exception as exc:
logger.error("Could not get json cache key '%s' due to: %s", key, exc)
return None
return None if result is None else string_to_json(result)
def get_string(self, key: str) -> str | None:
try:
result: str | None = self.client.get(key)
logger.debug(
"Got string cache key '%s'%s",
key,
"" if result is not None else " (not found)",
)
return result
except Exception as exc:
logger.error("Could not get string cache key '%s' due to: %s", key, exc)
return None
# TODO Implement binary deserialization
def get_object(self, key: str) -> Any | None:
raise NotImplementedError()
def set_json(self, key: str, value: Any) -> None:
try:
self.client.set(key, json_to_string(value))
logger.debug("Set json cache key '%s'", key)
except Exception as exc:
logger.error("Could not set json cache key '%s' due to: %s", key, exc)
return None
def set_string(self, key: str, value: str) -> None:
try:
self.client.set(key, value)
logger.debug("Set string cache key '%s'", key)
except Exception as exc:
logger.error("Could not set string cache key '%s' due to: %s", key, exc)
return None
# TODO Implement binary serialization
def set_object(self, key: str, value: Any) -> None:
raise NotImplementedError()
def delete(self, key: str) -> None:
try:
self.client.delete(key)
logger.debug("Deleted cache key '%s'", key)
except Exception as exc:
logger.error("Could not delete cache key '%s' due to: %s", key, exc)

22
classes/holo_bot.py Normal file
View File

@@ -0,0 +1,22 @@
import logging
from logging import Logger
from libbot.pycord.classes import PycordBot
from classes.cache.holo_cache_memcached import HoloCacheMemcached
from classes.cache.holo_cache_redis import HoloCacheRedis
from modules.cache_manager import create_cache_client
logger: Logger = logging.getLogger(__name__)
class HoloBot(PycordBot):
cache: HoloCacheMemcached | HoloCacheRedis | None = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._set_cache_engine()
def _set_cache_engine(self) -> None:
if "cache" in self.config and self.config["cache"]["type"] is not None:
self.cache = create_cache_client(self.config, self.config["cache"]["type"])

View File

@@ -1,144 +1,261 @@
import logging import logging
from typing import Any, Union from logging import Logger
from typing import Any, Dict
import discord from bson import ObjectId
import discord.member from discord import User, Member
from libbot.utils import config_get
from pymongo.results import InsertOneResult
from typing_extensions import deprecated
from modules.database import col_users, col_warnings from classes.cache import HoloCache
from modules.utils import config_get from errors import UserNotFoundError
from modules.database import col_warnings, col_users
logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)
class HoloUser: class HoloUser:
def __init__( def __init__(
self, user: Union[discord.User, discord.Member, discord.member.Member, int] self,
_id: ObjectId,
id: int,
custom_role: int | None,
custom_channel: int | None,
) -> None: ) -> None:
self._id: ObjectId = _id
self.id: int = id
self.custom_role: int | None = custom_role
self.custom_channel: int | None = custom_channel
@classmethod
async def from_user(
cls,
user: User | Member,
allow_creation: bool = True,
cache: HoloCache | None = None,
) -> "HoloUser":
"""Get an object that has a proper binding between Discord ID and database """Get an object that has a proper binding between Discord ID and database
### Args: ### Args:
* `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted * `user` (User | Member): Object from which an ID can be extracted
* `allow_creation` (bool, optional): Whether to allow creation of a new user record if none found. Defaults to True.
* `cache` (HoloCache | None, optional): Cache engine to get the cache from
### Raises: ### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database * `UserNotFoundError`: User with such ID does not seem to exist in database
""" """
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user.id}")
if hasattr(user, "id"): if cached_entry is not None:
self.id = user.id # type: ignore return cls(**cached_entry)
else:
self.id = user
jav_user = col_users.find_one({"user": self.id}) db_entry: Dict[str, Any] | None = await col_users.find_one({"id": user.id})
if jav_user is None: if db_entry is None:
raise UserNotFoundError(user=user, user_id=self.id) if not allow_creation:
raise UserNotFoundError(user=user, user_id=user.id)
self.db_id = jav_user["_id"] db_entry = HoloUser.get_defaults(user.id)
self.customrole = jav_user["customrole"] insert_result: InsertOneResult = await col_users.insert_one(db_entry)
self.customchannel = jav_user["customchannel"]
self.warnings = self.warns()
def warns(self) -> int: db_entry["_id"] = insert_result.inserted_id
if cache is not None:
cache.set_json(f"user_{user.id}", db_entry)
return cls(**db_entry)
@classmethod
async def from_id(cls, user_id: int) -> "HoloUser":
return NotImplemented
# TODO Deprecate and remove warnings
@deprecated("Warnings are deprecated")
async def get_warnings(self) -> int:
"""Get number of warnings user has """Get number of warnings user has
### Returns: ### Returns:
* `int`: Number of warnings * `int`: Number of warnings
""" """
warns = col_warnings.find_one({"user": self.id}) warns: Dict[str, Any] | None = await col_warnings.find_one({"id": self.id})
if warns == None:
return 0
else:
return warns["warns"]
def warn(self, count=1, reason: str = "Not provided") -> None: return 0 if warns is None else warns["warns"]
# TODO Deprecate and remove warnings
@deprecated("Warnings are deprecated")
async def warn(self, count: int = 1, reason: str = "Reason not provided") -> None:
"""Warn and add count to warns number """Warn and add count to warns number
### Args: ### Args:
* `count` (int, optional): Count of warnings to be added. Defaults to 1. * `count` (int, optional): Count of warnings to be added. Defaults to 1.
* `reason` (int, optional): Count of warnings to be added. Defaults to 1.
""" """
warns = col_warnings.find_one({"user": self.id}) warns: Dict[str, Any] | None = await col_warnings.find_one({"id": self.id})
if warns != None:
col_warnings.update_one( if warns is not None:
filter={"_id": self.db_id}, await col_warnings.update_one(
update={"$set": {"warns": warns["warns"] + count}}, {"_id": self._id},
{"$set": {"warns": warns["warns"] + count}},
) )
else: else:
col_warnings.insert_one(document={"user": self.id, "warns": count}) await col_warnings.insert_one(document={"id": self.id, "warns": count})
logger.info(f"User {self.id} was warned {count} times due to: {reason}")
def set(self, key: str, value: Any) -> None: logger.info("User %s was warned %s times due to: %s", self.id, count, reason)
"""Set attribute data and save it into database
async def _set(self, key: str, value: Any, cache: HoloCache | None = None) -> None:
"""Set attribute data and save it into the database
### Args: ### Args:
* `key` (str): Attribute to be changed * `key` (str): Attribute to be changed
* `value` (Any): Value to set * `value` (Any): Value to set
* `cache` (HoloCache | None, optional): Cache engine to write the update into
""" """
if not hasattr(self, key): if not hasattr(self, key):
raise AttributeError() raise AttributeError()
setattr(self, key, value)
col_users.update_one(
filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
)
logger.info(f"Set attribute {key} of user {self.id} to {value}")
async def is_moderator( setattr(self, key, value)
self, member: Union[discord.User, discord.Member, discord.member.Member]
) -> bool: await col_users.update_one(
{"_id": self._id}, {"$set": {key: value}}, upsert=True
)
self._update_cache(cache)
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value)
async def _remove(self, key: str, cache: HoloCache | None = None) -> None:
"""Remove attribute data and save it into the database
### Args:
* `key` (str): Attribute to be removed
* `cache` (HoloCache | None, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = HoloUser.get_default_value(key)
setattr(self, key, default_value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: default_value}}, upsert=True
)
self._update_cache(cache)
logger.info("Removed attribute '%s' of user %s", key, self.id)
def _get_cache_key(self) -> str:
return f"user_{self.id}"
def _update_cache(self, cache: HoloCache | None = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self._to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: HoloCache | None = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
@staticmethod
def get_defaults(user_id: int | None = None) -> Dict[str, Any]:
return {
"id": user_id,
"custom_role": None,
"custom_channel": None,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in HoloUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in HoloUser")
return HoloUser.get_defaults()[key]
def _to_dict(self) -> Dict[str, Any]:
return {
"_id": self._id,
"id": self.id,
"custom_role": self.custom_role,
"custom_channel": self.custom_channel,
}
async def set_custom_channel(
self, channel_id: int, cache: HoloCache | None = None
) -> None:
await self._set("custom_channel", channel_id, cache=cache)
async def set_custom_role(
self, role_id: int, cache: HoloCache | None = None
) -> None:
await self._set("custom_role", role_id, cache=cache)
async def remove_custom_channel(self, cache: HoloCache | None = None) -> None:
await self._remove("custom_channel", cache=cache)
async def remove_custom_role(self, cache: HoloCache | None = None) -> None:
await self._remove("custom_role", cache=cache)
async def purge(self, cache: HoloCache | None = None) -> None:
"""Completely remove user data from database. Will not remove transactions logs and warnings.
### Args:
* `cache` (HoloCache | None, optional): Cache engine to write the update into
"""
await col_users.delete_one({"_id": self._id})
self._delete_cache(cache)
@staticmethod
async def is_moderator(member: User | Member) -> bool:
"""Check if user is moderator or council member """Check if user is moderator or council member
### Args: ### Args:
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object * `member` (User | Member): Member object
### Returns: ### Returns:
`bool`: `True` if member is a moderator or member of council and `False` if not `bool`: `True` if member is a moderator or member of council and `False` if not
""" """
if isinstance(member, discord.User): if isinstance(member, User):
return False
moderator_role = await config_get("moderators", "roles")
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == moderator_role or role.id == council_role:
return True
return False return False
async def is_council( moderator_role: int | None = await config_get("moderators", "roles")
self, member: Union[discord.User, discord.Member, discord.member.Member] council_role: int | None = await config_get("council", "roles")
) -> bool:
for role in member.roles:
if role.id in (moderator_role, council_role):
return True
return False
@staticmethod
async def is_council(member: User | Member) -> bool:
"""Check if user is a member of council """Check if user is a member of council
### Args: ### Args:
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object * `member` (User | Member): Member object
### Returns: ### Returns:
`bool`: `True` if member is a member of council and `False` if not `bool`: `True` if member is a member of council and `False` if not
""" """
if isinstance(member, discord.User): if isinstance(member, User):
return False return False
council_role = await config_get("council", "roles") council_role = await config_get("council", "roles")
for role in member.roles: for role in member.roles:
if role.id == council_role: if role.id == council_role:
return True return True
return False
# def purge(self) -> None: return False
# """Completely remove data from database. Will not remove transactions logs and warnings."""
# col_users.delete_one(filter={"_id": self.db_id})
# self.unauthorize()

View File

@@ -1,108 +1,39 @@
import logging import logging
import sys
from logging import Logger
from discord import ApplicationContext, Embed, User, option, slash_command from discord import (
ApplicationContext,
Embed,
User,
option,
slash_command,
Role,
TextChannel,
)
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot from libbot.utils import config_get
from enums.colors import Color from classes.holo_bot import HoloBot
from modules.scheduled import scheduler from enums import Color
from modules.utils import config_get from modules.scheduler import scheduler
from modules.utils_sync import config_get_sync, guild_name from modules.utils_sync import guild_name
from modules.waifu_pics import waifu_pics
logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
class Admin(commands.Cog): class Admin(commands.Cog):
def __init__(self, client: PycordBot): """Cog with utility commands for admins."""
self.client = client
# @slash_command( def __init__(self, client: HoloBot):
# name="warning", self.client: HoloBot = client
# description="Попередити юзера про порушення правил",
# guild_ids=[config_get_sync("guild")],
# )
# @option("user", description="Користувач")
# @option("reason", description="Причина")
# async def warn_cmd(
# self,
# ctx: ApplicationContext,
# user: User,
# reason: str = "Не вказана",
# ):
# logging.info(f"User {ctx.user.id} warned {user.id} for {reason}")
# await ctx.defer()
# jav_user = HoloUser(user)
# if ctx.user.id in await config_get("admins"):
# logging.info(
# f"Moderator {guild_name(ctx.user)} warned {guild_name(user)} for {reason} (has {jav_user.warnings} warns)"
# )
# if jav_user.warnings >= 5:
# logging.info(
# f"User {guild_name(user)} was banned due to a big amount of warns ({jav_user.warnings})"
# )
# await user.send(
# embed=Embed(
# title="Перманентне блокування",
# description=f"Вас було заблоковано за неодноразове порушення правил сервера.",
# color=Color.fail,
# )
# )
# await user.ban(reason=reason)
# elif jav_user.warnings >= 2:
# logging.info(
# f"User {guild_name(user)} was muted due to a big amount of warns ({jav_user.warnings})"
# )
# jav_user.warn(reason=reason)
# await user.send(
# embed=Embed(
# title="Тимчасове блокування",
# description=f"Причина: `{reason}`\n\nНа вашому рахунку вже {jav_user.warnings} попереджень. Вас було тимчасово заблоковано на **1 годину**.\n\nЯкщо Ви продовжите порушувати правила сервера згодом Вас заблокують.",
# color=0xDED56B,
# )
# )
# await user.timeout_for(timedelta(hours=1), reason=reason)
# else:
# jav_user.warn()
# await ctx.respond(
# embed=Embed(
# title="Попередження",
# description=f"{user.mention} Будь ласка, не порушуйте правила. Ви отримали попередження з причини `{reason}`.\n\nЯкщо Ви продовжите порушувати правила це може призвести до блокування в спільноті.",
# color=0xDED56B,
# )
# )
# else:
# logging.warning(
# f"User {guild_name(ctx.user)} tried to use /warn but permission denied"
# )
# await ctx.respond(
# embed=Embed(
# title="Відмовлено в доступі",
# description="Здається, це команда лише для модераторів",
# color=Color.fail,
# )
# )
# mod_role = ds_utils.get(
# ctx.user.guild.roles, id=await config_get("moderators", "roles")
# )
# admin_chan = ds_utils.get(
# ctx.user.guild.channels,
# id=await config_get("adminchat", "channels", "text"),
# )
# await admin_chan.send(
# content=f"{mod_role.mention}",
# embed=Embed(
# title="Неавторизований запит",
# description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
# color=Color.fail,
# ),
# )
@slash_command( @slash_command(
name="clear", name="clear",
description="Видалити деяку кількість повідомлень в каналі", description="Видалити деяку кількість повідомлень в каналі",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option("amount", description="Кількість") @option("amount", description="Кількість")
@option("user", description="Користувач", default=None) @option("user", description="Користувач", default=None)
@@ -111,7 +42,11 @@ class Admin(commands.Cog):
ctx: ApplicationContext, ctx: ApplicationContext,
amount: int, amount: int,
user: User, user: User,
): ) -> None:
"""Command /clear <amount> [<user>]
Removes last <amount> messages in the current channel. Optionally from a specific user.
"""
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
logging.info( logging.info(
"User %s removed %s message(s) in %s", "User %s removed %s message(s) in %s",
@@ -119,89 +54,111 @@ class Admin(commands.Cog):
amount, amount,
ctx.channel.id, ctx.channel.id,
) )
await ctx.respond( await ctx.respond(
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0 embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
) )
if user == None:
if user is None:
await ctx.channel.purge(limit=amount) await ctx.channel.purge(limit=amount)
else: else:
await ctx.channel.purge( await ctx.channel.purge(
limit=amount, check=lambda msg: msg.author == user limit=amount, check=lambda msg: msg.author == user
) )
else:
return
logging.warning( logging.warning(
"User %s tried to use /clear but permission denied", "User %s tried to use /clear but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Відмовлено в доступі", title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів", description="Здається, це команда лише для модераторів",
color=Color.fail, color=Color.FAIL,
) )
) )
mod_role = ds_utils.get(
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan = ds_utils.get( admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
if admin_chan is not None:
await admin_chan.send( await admin_chan.send(
content=f"{mod_role.mention}", content="" if mod_role is None else mod_role.mention,
embed=Embed( embed=Embed(
title="Неавторизований запит", title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail, color=Color.FAIL,
), ),
) )
@slash_command( @slash_command(
name="reboot", name="reboot",
description="Перезапустити бота", description="Перезапустити бота",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
async def reboot_cmd(self, ctx: ApplicationContext): async def reboot_cmd(self, ctx: ApplicationContext) -> None:
"""Command /reboot
Stops the bot. Is called "reboot" because it's assumed that the bot has automatic restart.
"""
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user)) logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Вимикаюсь...", title="Вимикаюсь...",
description="Спробую перезавантажитись за 5 секунд", description="Спробую перезавантажитись за 5 секунд",
) )
) )
scheduler.shutdown() scheduler.shutdown()
await self.client.close() await self.client.close()
exit() await waifu_pics._client_session.close()
else:
sys.exit()
logging.warning( logging.warning(
"User %s tried to use /reboot but permission denied", "User %s tried to use /reboot but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Відмовлено в доступі", title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів", description="Здається, це команда лише для модераторів",
color=Color.fail, color=Color.FAIL,
) )
) )
mod_role = ds_utils.get(
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan = ds_utils.get( admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
if admin_chan is not None:
await admin_chan.send( await admin_chan.send(
content=f"{mod_role.mention}", content="" if mod_role is None else mod_role.mention,
embed=Embed( embed=Embed(
title="Неавторизований запит", title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail, color=Color.FAIL,
), ),
) )
def setup(client: PycordBot): def setup(client: HoloBot) -> None:
client.add_cog(Admin(client)) client.add_cog(Admin(client))

View File

@@ -1,26 +1,31 @@
import logging import logging
from logging import Logger
from typing import Dict, List, Any
from discord import Cog, Message from discord import Cog, Message
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from modules.database import col_analytics from modules.database import col_analytics
logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
class Analytics(commands.Cog): class Analytics(commands.Cog):
def __init__(self, client: PycordBot): def __init__(self, client: HoloBot):
self.client = client self.client: HoloBot = client
@Cog.listener() @Cog.listener()
async def on_message(self, message: Message): async def on_message(self, message: Message) -> None:
"""Listener that collects analytical data (stickers, attachments, messages)."""
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot == False) and (message.author.bot is False)
and (message.author.system == False) and (message.author.system is False)
): ):
stickers = [] # Handle stickers
stickers: List[Dict[str, Any]] = []
for sticker in message.stickers: for sticker in message.stickers:
stickers.append( stickers.append(
{ {
@@ -31,7 +36,9 @@ class Analytics(commands.Cog):
} }
) )
attachments = [] # Handle attachments
attachments: List[Dict[str, Any]] = []
for attachment in message.attachments: for attachment in message.attachments:
attachments.append( attachments.append(
{ {
@@ -46,9 +53,10 @@ class Analytics(commands.Cog):
} }
) )
col_analytics.insert_one( # Insert entry into the database
await col_analytics.insert_one(
{ {
"user": message.author.id, "user_id": message.author.id,
"channel": message.channel.id, "channel": message.channel.id,
"content": message.content, "content": message.content,
"stickers": stickers, "stickers": stickers,
@@ -57,5 +65,5 @@ class Analytics(commands.Cog):
) )
def setup(client: PycordBot): def setup(client: HoloBot) -> None:
client.add_cog(Analytics(client)) client.add_cog(Analytics(client))

View File

@@ -1,52 +1,91 @@
from discord import ApplicationContext, Embed, option import logging
from logging import Logger
from typing import Any, Dict
from discord import ApplicationContext, Embed, option, TextChannel, Role
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.abc import GuildChannel from discord.abc import GuildChannel
from discord.commands import SlashCommandGroup from discord.commands import SlashCommandGroup
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot from libbot.utils import config_get
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser from classes.holo_user import HoloUser
from enums.colors import Color from enums import Color
from modules.database import col_users from modules.database import col_users
from modules.utils import config_get from modules.utils_sync import guild_name
from modules.utils_sync import config_get_sync, guild_name
logger: Logger = logging.getLogger(__name__)
class CustomChannels(commands.Cog): class CustomChannels(commands.Cog):
def __init__(self, client: PycordBot): def __init__(self, client: HoloBot):
self.client = client self.client: HoloBot = client
@commands.Cog.listener() @commands.Cog.listener()
async def on_guild_channel_delete(self, channel: GuildChannel): async def on_guild_channel_delete(self, channel: GuildChannel) -> None:
col_users.find_one_and_update( await col_users.find_one_and_update(
{"customchannel": channel.id}, {"$set": {"customchannel": None}} {"custom_channel": channel.id}, {"$set": {"custom_channel": None}}
) )
customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом") custom_channel_group: SlashCommandGroup = SlashCommandGroup(
"customchannel", "Керування особистим каналом"
)
@customchannel.command( @custom_channel_group.command(
name="get", name="get",
description="Отримати персональний текстовий канал", description="Отримати персональний текстовий канал",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option("name", description="Назва каналу") @option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції") @option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки") @option("threads", description="Дозволити гілки")
async def customchannel_get_cmd( async def custom_channel_get_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
): ) -> None:
holo_user_ctx = HoloUser(ctx.user) """Command /customchannel get <name> <reactions> <threads>
Command to create a custom channel for a user.
"""
holo_user_ctx: HoloUser = await HoloUser.from_user(
ctx.user, cache=self.client.cache
)
# Return if the user is using the command outside of a guild
if not hasattr(ctx.author, "guild"):
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="Виконання за межами сервера не є можливим.",
color=Color.FAIL,
)
)
return
# Return if the user already has a custom channel
if holo_user_ctx.custom_channel is not None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.FAIL,
)
)
return
if holo_user_ctx.customchannel == None:
await ctx.defer() await ctx.defer()
created_channel = await ctx.user.guild.create_text_channel(
created_channel: TextChannel = await ctx.user.guild.create_text_channel(
name=name, name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал", reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get( category=ds_utils.get(
ctx.author.guild.categories, ctx.author.guild.categories,
id=await config_get("customchannels", "categories"), id=await config_get("custom_channels", "categories"),
), ),
) )
await created_channel.set_permissions( await created_channel.set_permissions(
ctx.user.guild.default_role, ctx.user.guild.default_role,
send_messages=False, send_messages=False,
@@ -62,55 +101,64 @@ class CustomChannels(commands.Cog):
embed_links=True, embed_links=True,
manage_channels=True, manage_channels=True,
) )
holo_user_ctx.set("customchannel", created_channel.id)
await holo_user_ctx.set_custom_channel(
created_channel.id, cache=self.client.cache
)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Створено канал", title="Створено канал",
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`", description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.success, color=Color.SUCCESS,
)
)
bots = await config_get("bots")
for bot in bots:
await created_channel.set_permissions(
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
else:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.fail,
) )
) )
@customchannel.command( bots: Dict[str, Any] = await config_get("bots")
for bot in bots:
role: Role | None = ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"])
if role is not None:
await created_channel.set_permissions(
role,
view_channel=False,
)
@custom_channel_group.command(
name="edit", name="edit",
description="Змінити параметри особистого каналу", description="Змінити параметри особистого каналу",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option("name", description="Назва каналу") @option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції") @option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки") @option("threads", description="Дозволити гілки")
async def customchannel_edit_cmd( async def custom_channel_edit_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
): ) -> None:
holo_user_ctx = HoloUser(ctx.user) """Command /customchannel edit <name> <reactions> <threads>
custom_channel = ds_utils.get( Command to change properties of a custom channel.
ctx.guild.channels, id=holo_user_ctx.customchannel """
holo_user_ctx: HoloUser = await HoloUser.from_user(
ctx.user, cache=self.client.cache
) )
custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.custom_channel
)
# Return if the channel was not found
if custom_channel is None: if custom_channel is None:
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Канал не знайдено", title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail, color=Color.FAIL,
) )
) )
return return
await custom_channel.edit(name=name) await custom_channel.edit(name=name)
await custom_channel.set_permissions( await custom_channel.set_permissions(
ctx.user.guild.default_role, ctx.user.guild.default_role,
@@ -119,68 +167,89 @@ class CustomChannels(commands.Cog):
create_public_threads=threads, create_public_threads=threads,
create_private_threads=threads, create_private_threads=threads,
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Канал змінено", title="Канал змінено",
description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`", description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`",
color=Color.fail, color=Color.FAIL,
) )
) )
@customchannel.command( @custom_channel_group.command(
name="remove", name="remove",
description="Відібрати канал, знищуючи його, та частково повернути кошти", description="Відібрати канал, знищуючи його, та частково повернути кошти",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option("confirm", description="Підтвердження операції") @option("confirm", description="Підтвердження операції")
async def customchannel_remove_cmd( async def custom_channel_remove_cmd(
self, ctx: ApplicationContext, confirm: bool = False self, ctx: ApplicationContext, confirm: bool = False
): ) -> None:
holo_user_ctx = HoloUser(ctx.user) """Command /customchannel remove [<confirm>]
if holo_user_ctx.customchannel is not None: Command to remove a custom channel. Requires additional confirmation."""
await ctx.defer() holo_user_ctx: HoloUser = await HoloUser.from_user(
custom_channel = ds_utils.get( ctx.user, cache=self.client.cache
ctx.guild.channels, id=holo_user_ctx.customchannel
) )
if custom_channel is None:
await ctx.respond( # Return if the user does not have a custom channel
embed=Embed( if holo_user_ctx.custom_channel is None:
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail,
)
)
holo_user_ctx.set("customchannel", None)
return
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.fail,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.default,
)
)
else:
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Помилка виконання", title="Помилка виконання",
description="У вас немає особистого каналу.", description="У вас немає особистого каналу.",
color=Color.fail, color=Color.FAIL,
) )
) )
return
await ctx.defer()
custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.custom_channel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.FAIL,
)
)
await holo_user_ctx.remove_custom_channel(cache=self.client.cache)
return
# Return if the confirmation is missing
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.FAIL,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
await holo_user_ctx.remove_custom_channel(cache=self.client.cache)
try:
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.DEFAULT,
)
)
except Exception as exc:
logger.warning(
"Could not send a custom channel removal confirmation due to: %s", exc
)
def setup(client: PycordBot): def setup(client: HoloBot) -> None:
client.add_cog(CustomChannels(client)) client.add_cog(CustomChannels(client))

View File

@@ -1,109 +1,159 @@
import logging import logging
from logging import Logger
from os import makedirs from os import makedirs
from pathlib import Path from pathlib import Path
from typing import List, Dict, Any
from uuid import uuid4 from uuid import uuid4
from discord import ApplicationContext, Embed, File, option from discord import ApplicationContext, Embed, File, option, Role, TextChannel
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.commands import SlashCommandGroup from discord.commands import SlashCommandGroup
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot from libbot.utils import config_get, json_write
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser from classes.holo_user import HoloUser
from enums.colors import Color from enums import Color
from modules.database import col_users from modules.utils_sync import guild_name
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
class Data(commands.Cog): class Data(commands.Cog):
def __init__(self, client: PycordBot): def __init__(self, client: HoloBot):
self.client = client self.client: HoloBot = client
data = SlashCommandGroup("data", "Керування даними користувачів") data: SlashCommandGroup = SlashCommandGroup("data", "Керування даними користувачів")
@data.command( @data.command(
name="export", name="export",
description="Експортувати дані", description="Експортувати дані",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option( @option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_export_cmd(self, ctx: ApplicationContext, kind: str): async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None:
"""Command /data export <kind>
Command to export specific kind of data."""
await ctx.defer() await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or ( # Return if the user is not an owner and not in the council
await holo_user.is_council(ctx.author) if (ctx.user.id not in self.client.owner_ids) and not (
await HoloUser.is_council(ctx.author)
): ):
logging.info( logging.info(
"Moderator %s exported current users list", guild_name(ctx.user) "User %s tried to use /export but permission denied",
guild_name(ctx.user),
) )
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
)
)
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
),
)
return
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
makedirs("tmp", exist_ok=True) makedirs("tmp", exist_ok=True)
uuid = str(uuid4())
uuid: str = str(uuid4())
if kind == "Користувачі": if kind == "Користувачі":
users = [] users: List[Dict[str, Any]] = []
for member in ctx.guild.members: for member in ctx.guild.members:
users.append( users.append(
{ {
"id": member.id, "id": member.id,
"nick": member.nick, "nick": member.nick,
"username": f"{member.name}#{member.discriminator}", "username": member.name,
"bot": member.bot, "bot": member.bot,
} }
) )
json_write_sync(users, str(Path(f"tmp/{uuid}"))) # Temporary file must be written synchronously,
# otherwise it will not be there when ctx.respond() is be called
json_write(users, Path(f"tmp/{uuid}"))
await ctx.respond( await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
)
else:
logging.info(
"User %s tried to use /export but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
# TODO Deprecate this command
@data.command( @data.command(
name="migrate", name="migrate",
description="Мігрувати всіх користувачів до бази", description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option( @option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str): async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None:
"""Command /migrate <kind>
Command to migrate specific kind of data.
Migration of users in this case means creation of their DB entries."""
await ctx.defer() await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or ( # Return if the user is not an owner and not in the council
await holo_user.is_council(ctx.author) if (ctx.user.id not in self.client.owner_ids) and not (
await HoloUser.is_council(ctx.author)
): ):
logging.info(
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
)
)
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
),
)
return
logging.info( logging.info(
"Moderator %s started migration of all members to the database", "Moderator %s started migration of all members to the database",
guild_name(ctx.user), guild_name(ctx.user),
@@ -113,55 +163,17 @@ class Data(commands.Cog):
for member in ctx.guild.members: for member in ctx.guild.members:
if member.bot: if member.bot:
continue continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id await HoloUser.from_user(member, cache=self.client.cache)
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Міграцію завершено", title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.", description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.success, color=Color.SUCCESS,
) )
) )
else:
logging.info(
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
def setup(client: PycordBot): def setup(client: HoloBot) -> None:
client.add_cog(Data(client)) client.add_cog(Data(client))

View File

@@ -1,40 +1,42 @@
import logging import logging
from logging import Logger
from discord import ApplicationContext, Embed, User, option, slash_command from discord import ApplicationContext, Embed, User, option, slash_command
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot from libbot.utils import config_get
from WaifuPicsPython import WaifuAsync
from modules.utils import config_get from classes.holo_bot import HoloBot
from modules.utils_sync import config_get_sync, guild_name from modules.utils_sync import guild_name
from modules.waifu_pics import waifu_pics
logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
wafiu_pics = WaifuAsync()
class Fun(commands.Cog): class Fun(commands.Cog):
def __init__(self, client: PycordBot): def __init__(self, client: HoloBot):
self.client = client self.client: HoloBot = client
@slash_command( @slash_command(
name="action", name="action",
description="Провести над користувачем РП дію", description="Провести над користувачем РП дію",
guild_ids=[config_get_sync("guild")], guild_ids=[config_get("guild")],
) )
@option( @option(
"type", "type",
description="Тип дії, яку хочете провести з користувачем", description="Тип дії, яку хочете провести з користувачем",
choices=config_get_sync("actions").keys(), choices=config_get("actions").keys(),
) )
@option("user", description="Користувач") @option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User): async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None:
"""Command /action <type> <user>
Command to perform some RP action on a user and send them a GIF."""
await ctx.defer() await ctx.defer()
action = await config_get("category", "actions", type) action: str = await config_get("category", "actions", type)
action_verb = await config_get("action", "actions", type) action_verb: str = await config_get("action", "actions", type)
image = await wafiu_pics.sfw(action) image_url: str = await waifu_pics.sfw(action)
logger.info( logger.info(
"User %s (%s) %s %s (%s) with image %s", "User %s (%s) %s %s (%s) with image %s",
@@ -43,17 +45,17 @@ class Fun(commands.Cog):
action_verb, action_verb,
guild_name(user), guild_name(user),
user.id, user.id,
image, image_url,
) )
embed = Embed( embed: Embed = Embed(
description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**", description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**",
color=0x2F3136, color=0x2F3136,
) )
embed.set_image(url=image) embed.set_image(url=image_url)
await ctx.respond(embed=embed) await ctx.respond(embed=embed)
def setup(client: PycordBot): def setup(client: HoloBot) -> None:
client.add_cog(Fun(client)) client.add_cog(Fun(client))

View File

@@ -1,67 +1,81 @@
from discord import Member, Message import logging
from logging import Logger
from discord import Member, Message, TextChannel, MessageType
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot from libbot.utils import config_get
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser
from modules.database import col_users from modules.database import col_users
from modules.utils import config_get
logger: Logger = logging.getLogger(__name__)
class Logger(commands.Cog): class Logger(commands.Cog):
def __init__(self, client: PycordBot): def __init__(self, client: HoloBot):
self.client = client self.client: HoloBot = client
@commands.Cog.listener() @commands.Cog.listener()
async def on_message(self, message: Message): async def on_message(self, message: Message):
"""Message listener. All actions on messages remain here for now."""
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot == False) and (message.author.bot is False)
and (message.author.system == False) and (message.author.system is False)
): ):
if col_users.find_one({"user": message.author.id}) is None: await HoloUser.from_user(message.author, cache=self.client.cache)
user = {}
defaults = await config_get("user", "defaults")
user["user"] = message.author.id if (
(message.type == MessageType.thread_created)
for key in defaults: and (message.channel is not None)
user[key] = defaults[key] and (
await col_users.count_documents({"custom_channel": message.channel.id})
col_users.insert_one(document=user) > 0
)
):
try:
logger.info(
"Deleting the thread creation message in a custom channel %s",
message.channel.id,
)
await message.delete()
except Exception as exc:
logger.warning(
"Could not delete the thread creation message in a custom channel %s due to %s",
message.channel.id,
exc,
)
@commands.Cog.listener() @commands.Cog.listener()
async def on_member_join(self, member: Member): async def on_member_join(self, member: Member) -> None:
welcome_chan = ds_utils.get( """Member join handler. All actions on member join remain here for now."""
welcome_chan: TextChannel | None = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels, self.client.get_guild(await config_get("guild")).channels,
id=await config_get("welcome", "channels", "text"), id=await config_get("welcome", "channels", "text"),
) )
rules_chan = ds_utils.get( rules_chan: TextChannel | None = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels, self.client.get_guild(await config_get("guild")).channels,
id=await config_get("rules", "channels", "text"), id=await config_get("rules", "channels", "text"),
) )
if ( if (
(member != self.client.user) (member != self.client.user)
and (member.bot == False) and (member.bot is False)
and (member.system == False) and (member.system is False)
): ):
if welcome_chan is not None and rules_chan is not None:
await welcome_chan.send( await welcome_chan.send(
content=(await config_get("welcome", "messages")).format( content=(await config_get("welcome", "messages")).format(
mention=member.mention, rules=rules_chan.mention mention=member.mention, rules=rules_chan.mention
) )
) )
else:
logger.warning("Could not find a welcome and/or rules channel by id")
if col_users.find_one({"user": member.id}) is None: await HoloUser.from_user(member, cache=self.client.cache)
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
def setup(client: PycordBot): def setup(client: HoloBot) -> None:
client.add_cog(Logger(client)) client.add_cog(Logger(client))

58
cogs/utility.py Normal file
View File

@@ -0,0 +1,58 @@
import logging
from logging import Logger
from discord import Activity, ActivityType
from discord.ext import commands
from libbot.utils import config_get
from classes.holo_bot import HoloBot
logger: Logger = logging.getLogger(__name__)
class Utility(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Listener for the event when bot connects to Discord and becomes "ready"."""
logger.info("Logged in as %s", self.client.user)
activity_type: str = await config_get("type", "status")
activity_message: str = await config_get("message", "status")
if activity_type == "playing":
await self.client.change_presence(
activity=Activity(type=ActivityType.playing, name=activity_message)
)
elif activity_type == "watching":
await self.client.change_presence(
activity=Activity(type=ActivityType.watching, name=activity_message)
)
elif activity_type == "listening":
await self.client.change_presence(
activity=Activity(type=ActivityType.listening, name=activity_message)
)
elif activity_type == "streaming":
await self.client.change_presence(
activity=Activity(type=ActivityType.streaming, name=activity_message)
)
elif activity_type == "competing":
await self.client.change_presence(
activity=Activity(type=ActivityType.competing, name=activity_message)
)
elif activity_type == "custom":
await self.client.change_presence(
activity=Activity(type=ActivityType.custom, name=activity_message)
)
else:
return
logger.info(
"Set activity type to %s with message %s", activity_type, activity_message
)
def setup(client: HoloBot) -> None:
client.add_cog(Utility(client))

View File

@@ -22,18 +22,22 @@
"port": 27017, "port": 27017,
"name": "holo_discord" "name": "holo_discord"
}, },
"cache": {
"type": null,
"memcached": {
"uri": "127.0.0.1:11211"
},
"redis": {
"uri": "redis://127.0.0.1:6379/0"
}
},
"logging": { "logging": {
"size": 512, "size": 512,
"location": "logs" "location": "logs"
}, },
"defaults": { "defaults": {},
"user": {
"customrole": null,
"customchannel": null
}
},
"categories": { "categories": {
"customchannels": 0 "custom_channels": 0
}, },
"channels": { "channels": {
"text": { "text": {

1
enums/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .colors import Color

View File

@@ -1,6 +1,7 @@
from enum import IntEnum from enum import IntEnum
class Color(IntEnum): class Color(IntEnum):
fail = 0xd6345b FAIL = 0xD6345B
success = 0x84d961 SUCCESS = 0x84D961
default = 0xa7a6ab DEFAULT = 0xA7A6AB

1
errors/__init__.py Normal file
View File

@@ -0,0 +1 @@
from .user import UserNotFoundError

10
errors/user.py Normal file
View File

@@ -0,0 +1,10 @@
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)

101
main.py
View File

@@ -1,76 +1,71 @@
import contextlib
import logging import logging
import sys
from argparse import ArgumentParser
from logging import Logger
from pathlib import Path
from discord import Activity, ActivityType from discord import LoginFailure, Intents
from libbot.utils import config_get
from modules.client import client from classes.holo_bot import HoloBot
from modules.scheduled import scheduler from modules.migrator import migrate_database
from modules.utils import config_get from modules.scheduler import scheduler
from modules.utils_sync import config_get_sync
if not Path("config.json").exists():
print(
"Config file is missing: Make sure the configuration file 'config.json' is in place.",
flush=True,
)
sys.exit()
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO if not config_get("debug") else logging.DEBUG,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s", format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]", datefmt="[%X]",
) )
logger = logging.getLogger(__name__) logger: Logger = logging.getLogger(__name__)
try: # Declare the parser that retrieves the command line arguments
import uvloop # type: ignore parser = ArgumentParser(
prog="HoloUA Discord",
description="Discord bot for the HoloUA community.",
)
# Add a switch argument --migrate to be parsed...
parser.add_argument("--migrate", action="store_true")
# ...and parse the arguments we added
args = parser.parse_args()
# Try to import the module that improves performance
# and ignore errors when module is not installed
with contextlib.suppress(ImportError):
import uvloop
uvloop.install() uvloop.install()
except ImportError:
pass
@client.event def main() -> None:
async def on_ready(): # Perform migration if command line argument was provided
logger.info("Logged in as %s", client.user) if args.migrate:
logger.info("Performing migrations...")
migrate_database()
activity_type = await config_get("type", "status") intents: Intents = Intents().all()
activity_message = await config_get("message", "status") client: HoloBot = HoloBot(intents=intents, scheduler=scheduler)
if activity_type == "playing":
await client.change_presence(
activity=Activity(type=ActivityType.playing, name=activity_message)
)
elif activity_type == "watching":
await client.change_presence(
activity=Activity(type=ActivityType.watching, name=activity_message)
)
elif activity_type == "listening":
await client.change_presence(
activity=Activity(type=ActivityType.listening, name=activity_message)
)
elif activity_type == "streaming":
await client.change_presence(
activity=Activity(type=ActivityType.streaming, name=activity_message)
)
elif activity_type == "competing":
await client.change_presence(
activity=Activity(type=ActivityType.competing, name=activity_message)
)
elif activity_type == "custom":
await client.change_presence(
activity=Activity(type=ActivityType.custom, name=activity_message)
)
else:
return
logger.info(
"Set activity type to %s with message %s", activity_type, activity_message
)
def main():
client.load_extension("cogs") client.load_extension("cogs")
try: try:
scheduler.start() client.run(config_get("bot_token", "bot"))
client.run(config_get_sync("bot_token", "bot")) except LoginFailure as exc:
logger.error("Provided bot token is invalid: %s", exc)
except KeyboardInterrupt: except KeyboardInterrupt:
scheduler.shutdown() logger.info("KeyboardInterrupt received: Shutting down gracefully.")
exit() finally:
sys.exit()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -0,0 +1,79 @@
import logging
from logging import Logger
from libbot.utils import config_get, config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
# Categories
config_set(
"custom_channels",
config_get("customchannels", "categories"),
"categories",
)
config_delete("customchannels", "categories")
# User defaults
config_delete(
"user",
"defaults",
)
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"customchannel": {"$exists": True}},
{"$rename": {"customchannel": "custom_channel"}},
)
self.db.users.update_many(
{"customrole": {"$exists": True}},
{"$rename": {"customrole": "custom_role"}},
)
def downgrade(self):
try:
# Categories
config_set(
"customchannels",
config_get("custom_channels", "categories"),
"categories",
)
config_delete("custom_channels", "categories")
# User defaults
config_set(
"customrole",
None,
"defaults",
"user",
)
config_set(
"customchannel",
None,
"defaults",
"user",
)
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"custom_channel": {"$exists": True}},
{"$rename": {"custom_channel": "customchannel"}},
)
self.db.users.update_many(
{"custom_role": {"$exists": True}},
{"$rename": {"custom_role": "customrole"}},
)

View File

@@ -0,0 +1,63 @@
import logging
from logging import Logger
from libbot.utils import config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
config_set(
"cache",
{
"type": None,
"memcached": {"uri": "127.0.0.1:11211"},
"redis": {"uri": "redis://127.0.0.1:6379/0"},
},
*[],
)
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "id"}},
)
self.db.analytics.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "user_id"}},
)
self.db.warnings.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "user_id"}},
)
def downgrade(self):
try:
config_delete("cache", *[])
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"id": {"$exists": True}},
{"$rename": {"id": "user"}},
)
self.db.analytics.update_many(
{"user": {"$exists": True}},
{"$rename": {"user_id": "user"}},
)
self.db.warnings.update_many(
{"user": {"$exists": True}},
{"$rename": {"user_id": "user"}},
)

29
modules/cache_manager.py Normal file
View File

@@ -0,0 +1,29 @@
from typing import Dict, Any, Literal
from classes.cache.holo_cache_memcached import HoloCacheMemcached
from classes.cache.holo_cache_redis import HoloCacheRedis
def create_cache_client(
config: Dict[str, Any],
engine: Literal["memcached", "redis"] | None = None,
) -> HoloCacheMemcached | HoloCacheRedis:
if engine not in ["memcached", "redis"] or engine is None:
raise KeyError(
f"Incorrect cache engine provided. Expected 'memcached' or 'redis', got '{engine}'"
)
if "cache" not in config or engine not in config["cache"]:
raise KeyError(
f"Cache configuration is invalid. Please check if all keys are set (engine: '{engine}')"
)
match engine:
case "memcached":
return HoloCacheMemcached.from_config(config["cache"][engine])
case "redis":
return HoloCacheRedis.from_config(config["cache"][engine])
case _:
raise KeyError(
f"Cache implementation for the engine '{engine}' is not present."
)

25
modules/cache_utils.py Normal file
View File

@@ -0,0 +1,25 @@
from copy import deepcopy
from typing import Any
from bson import ObjectId
from ujson import dumps, loads
def json_to_string(json_object: Any) -> str:
json_object_copy: Any = deepcopy(json_object)
if isinstance(json_object_copy, dict) and "_id" in json_object_copy:
json_object_copy["_id"] = str(json_object_copy["_id"])
return dumps(
json_object_copy, ensure_ascii=False, indent=0, escape_forward_slashes=False
)
def string_to_json(json_string: str) -> Any:
json_object: Any = loads(json_string)
if "_id" in json_object:
json_object["_id"] = ObjectId(json_object["_id"])
return json_object

View File

@@ -1,6 +0,0 @@
from discord import Intents
from libbot.pycord.classes import PycordBot
intents = Intents().all()
intents.members = True
client = PycordBot(intents=intents)

View File

@@ -1,12 +1,19 @@
from typing import Dict, Any
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase
from libbot.utils import config_get
from pymongo import MongoClient from pymongo import MongoClient
from ujson import loads from pymongo.synchronous.collection import Collection
from pymongo.synchronous.database import Database
with open("config.json", "r", encoding="utf-8") as f: db_config: Dict[str, Any] = config_get("database")
db_config = loads(f.read())["database"]
f.close()
db_client = MongoClient( con_string: str = (
"mongodb://{0}:{1}@{2}:{3}/{4}".format( "mongodb://{0}:{1}/{2}".format(
db_config["host"], db_config["port"], db_config["name"]
)
if db_config["user"] is None or db_config["password"] is None
else "mongodb://{0}:{1}@{2}:{3}/{4}".format(
db_config["user"], db_config["user"],
db_config["password"], db_config["password"],
db_config["host"], db_config["host"],
@@ -14,14 +21,23 @@ db_client = MongoClient(
db_config["name"], db_config["name"],
) )
) )
db = db_client.get_database(name=db_config["name"])
collections = db.list_collection_names() db_client: AsyncClient = AsyncClient(con_string)
db_client_sync: MongoClient = MongoClient(con_string)
for collection in ["users", "warnings", "scheduler", "analytics"]: # Async declarations per default
if not collection in collections: db: AsyncDatabase = db_client.get_database(name=db_config["name"])
db.create_collection(collection)
col_users = db.get_collection("users") col_users: AsyncCollection = db.get_collection("users")
col_warnings = db.get_collection("warnings") col_warnings: AsyncCollection = db.get_collection("warnings")
col_analytics = db.get_collection("analytics") col_analytics: AsyncCollection = db.get_collection("analytics")
# Sync declarations as a fallback
sync_db: Database = db_client_sync.get_database(name=db_config["name"])
sync_col_users: Collection = sync_db.get_collection("users")
sync_col_warnings: Collection = sync_db.get_collection("warnings")
sync_col_analytics: Collection = sync_db.get_collection("analytics")
# Update indexes
sync_col_users.create_index(["id"], unique=True)

22
modules/migrator.py Normal file
View File

@@ -0,0 +1,22 @@
from typing import Any, Mapping
from libbot.utils import config_get
from mongodb_migrations.cli import MigrationManager
from mongodb_migrations.config import Configuration
def migrate_database() -> None:
"""Apply migrations from folder `migrations/` to the database"""
database_config: Mapping[str, Any] = config_get("database")
manager_config = Configuration(
{
"mongo_host": database_config["host"],
"mongo_port": database_config["port"],
"mongo_database": database_config["name"],
"mongo_username": database_config["user"],
"mongo_password": database_config["password"],
}
)
manager = MigrationManager(manager_config)
manager.run()

View File

@@ -1,3 +1,3 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler() scheduler: AsyncIOScheduler = AsyncIOScheduler()

View File

@@ -1,38 +0,0 @@
from typing import Any
import aiofiles
from ujson import dumps, loads
async def json_read(path: str) -> Any:
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: str) -> None:
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
)
async def config_get(key: str, *path: str) -> Any:
this_key = await json_read("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(key: str, value: Any, *path: str) -> None:
this_dict = await json_read("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
await json_write(this_dict, "config.json")
return

View File

@@ -1,45 +1,8 @@
from typing import Any, Union
from discord import Member, User from discord import Member, User
from ujson import dumps, loads
def json_read_sync(path: str) -> Any: def guild_name(member: Member | User) -> str:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Union[Member, User]) -> str:
if isinstance(member, User): if isinstance(member, User):
return member.name return member.name
if member.nick == None:
return member.name return member.name if member.nick is None else member.nick
else:
return member.nick

3
modules/waifu_pics.py Normal file
View File

@@ -0,0 +1,3 @@
from WaifuPicsPython import WaifuAsync
waifu_pics: WaifuAsync = WaifuAsync()

View File

@@ -1,8 +1,13 @@
aiofiles==23.2.1 # Waifu pics related dependencies (not listed directly by waifupics)
apscheduler==3.10.4 aiohttp>=3.10.0
pymongo~=4.7.3 requests>=2.32.2
requests~=2.32.3
aiofiles~=24.1.0
apscheduler>=3.10.0
async_pymongo==0.1.11
libbot[speed,pycord]==4.0.2
mongodb-migrations==1.3.1
pymemcache~=4.0.0
redis~=5.2.1
ujson~=5.10.0 ujson~=5.10.0
WaifuPicsPython==0.2.0 WaifuPicsPython==0.2.0
--extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
libbot[speed,pycord]==3.2.2

View File

@@ -1,14 +1,14 @@
{ {
"$jsonSchema": { "$jsonSchema": {
"required": [ "required": [
"user", "user_id",
"channel", "channel",
"content", "content",
"stickers", "stickers",
"attachments" "attachments"
], ],
"properties": { "properties": {
"user": { "user_id": {
"bsonType": "long", "bsonType": "long",
"description": "Discord ID of user" "description": "Discord ID of user"
}, },
@@ -17,7 +17,10 @@
"description": "Discord ID of a channel" "description": "Discord ID of a channel"
}, },
"content": { "content": {
"bsonType": ["null", "string"], "bsonType": [
"null",
"string"
],
"description": "Text of the message" "description": "Text of the message"
}, },
"stickers": { "stickers": {
@@ -40,7 +43,7 @@
"format": { "format": {
"bsonType": "array" "bsonType": "array"
}, },
"user": { "user_id": {
"bsonType": "string" "bsonType": "string"
} }
} }

View File

@@ -1,22 +1,27 @@
{ {
"$jsonSchema": { "$jsonSchema": {
"required": [ "required": [
"user", "id",
"customrole", "custom_role",
"customchannel" "custom_channel"
], ],
"properties": { "properties": {
"user": { "id": {
"bsonType": "long", "bsonType": "long",
"description": "Discord ID of user" "description": "Discord ID of user"
}, },
"customrole": { "custom_role": {
"bsonType": ["null", "long"], "bsonType": [
"null",
"long"
],
"description": "Discord ID of custom role or 'null' if not set" "description": "Discord ID of custom role or 'null' if not set"
}, },
"customchannel": { "custom_channel": {
"bsonType": ["null", "long"], "bsonType": [
"null",
"long"
],
"description": "Discord ID of custom channel or 'null' if not set" "description": "Discord ID of custom channel or 'null' if not set"
} }
} }

View File

@@ -1,15 +1,15 @@
{ {
"$jsonSchema": { "$jsonSchema": {
"required": [ "required": [
"user", "user_id",
"warns" "warnings"
], ],
"properties": { "properties": {
"user": { "user_id": {
"bsonType": "long", "bsonType": "long",
"description": "Discord ID of user" "description": "Discord ID of user"
}, },
"warns": { "warnings": {
"bsonType": "int", "bsonType": "int",
"description": "Number of warnings on count" "description": "Number of warnings on count"
} }