1 Commits

Author SHA1 Message Date
72ccaa04a4 Merge pull request 'v0.1.0-rc.1' (#5) from dev into main
Reviewed-on: #5
2024-06-23 13:06:12 +03:00
33 changed files with 687 additions and 1235 deletions

View File

@@ -1,40 +0,0 @@
# Include any files or directories that you don't want to be copied to your
# container here (e.g., local build artifacts, temporary files, etc.).
#
# For more help, visit the .dockerignore file reference guide at
# https://docs.docker.com/go/build-context-dockerignore/
**/.DS_Store
**/__pycache__
**/.venv
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/bin
**/charts
**/docker-compose*
**/compose.y*ml
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
config.json
.renovaterc
**/.idea
**/.mypy_cache
validation

View File

@@ -1,27 +0,0 @@
ARG PYTHON_VERSION=3.12.8
FROM python:${PYTHON_VERSION}-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
RUN --mount=type=cache,target=/root/.cache/pip \
--mount=type=bind,source=requirements.txt,target=requirements.txt \
python -m pip install -r requirements.txt
USER appuser
COPY . .
ENTRYPOINT ["python", "main.py", "--migrate"]

121
README.md
View File

@@ -1,121 +1,2 @@
<h1 align="center">HoloBot Discord</h1>
# HoloBotDiscord
<p align="center">Small Discord bot made on Py-Cord</p>
<p align="center">
<a href="https://git.end-play.xyz/HoloUA/Discord/src/branch/master/LICENSE"><img alt="License: GPL" src="https://img.shields.io/badge/License-GPL-blue"></a>
<a href="https://git.end-play.xyz/HoloUA/Discord"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
</p>
## Installation from release
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Install Python 3.11+
3. Download the [latest release](https://git.end-play.xyz/HoloUA/Discord/releases/latest)'s archive
4. Extract the archive
5. Navigate to the extracted folder and subfolder `Discord` in it
6. Create a virtual environment:
`python -m venv .venv` or `virtualenv .venv`
7. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
8. Install the dependencies:
`python -m pip install -r requirements.txt`
9. Run the bot with `python main.py` after completing the [configuration](#Configuration)
## Installation with Git
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Install Python 3.11+
3. Clone the repository:
`git clone https://git.end-play.xyz/HoloUA/Discord.git`
4. `cd Discord`
5. Create a virtual environment:
`python -m venv .venv` or `virtualenv .venv`
6. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
7. Install the dependencies:
`python -m pip install -r requirements.txt`
8. Run the bot with `python main.py` after completing the [configuration](#Configuration)
## Upgrading with Git
1. Go to the bot's directory
2. `git pull`
3. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
4. Update the dependencies:
`python -m pip install -r requirements.txt`
5. First start after the upgrade must initiate the migration:
`python main.py --migrate`
6. Now the bot is up to date and the next run will not require `--migrate` anymore
## Configuration
There's a file `config_example.json` which contains default configuration
and should be used as a base config.
Copy this file to `config.json` and open it with any text editor of your liking.
Modify the newly created configuration file to fit your needs.
Mandatory keys to modify:
- guild
- bot.owner
- bot.bot_token
- database.*
- categories.*
- channels.*
- roles.*
After all of that you're good to go! Happy using :)
## Caching
Although general database access speed is fast, caching might become helpful for
bigger servers with many bot interactions. Currently, Redis and Memcached are supported.
Configuration happens through the config key `caching`.
Set `caching.type` to the service of you choice ("redis" or "memcached") and then update
the URI to access the service. It's Redis' default URI format for Redis and "address:port"
for Memcached.
Which one should I choose?
| Service | Read/write speed | Config flexibility |
|-----------|------------------|--------------------|
| Redis | High | Very flexible |
| Memcached | Very high | Basic |
> Performance difference between Redis and Memcached is generally quite low, so your setup
> should normally depend more on the configuration flexibility than on raw speed.
## Debugging
There's a config key `debug` that can be set to `true` for debugging purposes.
It should be set to `false` in production, otherwise log becomes very verbose and might
contain data that shouldn't normally be logged.
## Docker [Experimental]
As an experiment, Docker deployment option has been added.
### Building the image
1. `git clone https://git.end-play.xyz/HoloUA/Discord.git`
2. `cd Discord`
3. `docker build -t holoua-discord .`
### Starting the bot
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Download
the [configuration example file](https://git.end-play.xyz/HoloUA/Discord/src/branch/main/config_example.json) and
store it somewhere you would like your bot to access it
3. Complete the [configuration](#Configuration) step for this file
4. `docker run -d -v /path/to/config.json:/app/config.json holoua-discord`

View File

View File

@@ -1,28 +0,0 @@
import logging
from logging import Logger
from typing import Literal
from libbot.cache.classes import CacheMemcached, CacheRedis
from libbot.cache.manager import create_cache_client
from libbot.pycord.classes import PycordBot
logger: Logger = logging.getLogger(__name__)
class HoloBot(PycordBot):
cache: CacheMemcached | CacheRedis | None = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._set_cache_engine()
def _set_cache_engine(self) -> None:
cache_type: Literal["redis", "memcached"] | None = self.config["cache"]["type"]
if "cache" in self.config and cache_type is not None:
self.cache = create_cache_client(
self.config,
cache_type,
prefix=self.config["cache"][cache_type]["prefix"],
default_ttl_seconds=3600,
)

View File

@@ -1,227 +1,144 @@
import logging
from logging import Logger
from typing import Any, Dict, Optional
from typing import Any, Union
from bson import ObjectId
from discord import User, Member
from libbot.cache.classes import Cache
from libbot.utils import config_get
from pymongo.results import InsertOneResult
import discord
import discord.member
from errors import UserNotFoundError
from modules.database import col_users
from modules.database import col_users, col_warnings
from modules.utils import config_get
logger: Logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)
class HoloUser:
def __init__(
self,
_id: ObjectId,
id: int,
custom_role: int | None,
custom_channel: int | None,
self, user: Union[discord.User, discord.Member, discord.member.Member, int]
) -> None:
self._id: ObjectId = _id
self.id: int = id
self.custom_role: int | None = custom_role
self.custom_channel: int | None = custom_channel
@classmethod
async def from_user(
cls,
user: User | Member,
allow_creation: bool = True,
cache: Optional[Cache] = None,
) -> "HoloUser":
"""Get an object that has a proper binding between Discord ID and database
### Args:
* `user` (User | Member): Object from which an ID can be extracted
* `allow_creation` (bool, optional): Whether to allow creation of a new user record if none found. Defaults to True.
* `cache` (Cache, optional): Cache engine to get the cache from
* `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted
### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database
"""
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user.id}")
if cached_entry is not None:
return cls(**cached_entry)
if hasattr(user, "id"):
self.id = user.id # type: ignore
else:
self.id = user
db_entry: Dict[str, Any] | None = await col_users.find_one({"id": user.id})
jav_user = col_users.find_one({"user": self.id})
if db_entry is None:
if not allow_creation:
raise UserNotFoundError(user=user, user_id=user.id)
if jav_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
db_entry = HoloUser.get_defaults(user.id)
self.db_id = jav_user["_id"]
insert_result: InsertOneResult = await col_users.insert_one(db_entry)
self.customrole = jav_user["customrole"]
self.customchannel = jav_user["customchannel"]
self.warnings = self.warns()
db_entry["_id"] = insert_result.inserted_id
def warns(self) -> int:
"""Get number of warnings user has
if cache is not None:
cache.set_json(f"user_{user.id}", db_entry)
### Returns:
* `int`: Number of warnings
"""
warns = col_warnings.find_one({"user": self.id})
if warns == None:
return 0
else:
return warns["warns"]
return cls(**db_entry)
def warn(self, count=1, reason: str = "Not provided") -> None:
"""Warn and add count to warns number
@classmethod
async def from_id(cls, user_id: int) -> "HoloUser":
raise NotImplementedError()
### Args:
* `count` (int, optional): Count of warnings to be added. Defaults to 1.
"""
warns = col_warnings.find_one({"user": self.id})
if warns != None:
col_warnings.update_one(
filter={"_id": self.db_id},
update={"$set": {"warns": warns["warns"] + count}},
)
else:
col_warnings.insert_one(document={"user": self.id, "warns": count})
logger.info(f"User {self.id} was warned {count} times due to: {reason}")
async def _set(self, key: str, value: Any, cache: Optional[Cache] = None) -> None:
"""Set attribute data and save it into the database
def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (str): Attribute to be changed
* `value` (Any): Value to set
* `cache` (Cache, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: value}}, upsert=True
col_users.update_one(
filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
)
logger.info(f"Set attribute {key} of user {self.id} to {value}")
self._update_cache(cache)
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value)
async def _remove(self, key: str, cache: Optional[Cache] = None) -> None:
"""Remove attribute data and save it into the database
### Args:
* `key` (str): Attribute to be removed
* `cache` (Cache, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = HoloUser.get_default_value(key)
setattr(self, key, default_value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: default_value}}, upsert=True
)
self._update_cache(cache)
logger.info("Removed attribute '%s' of user %s", key, self.id)
def _get_cache_key(self) -> str:
return f"user_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self._to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
@staticmethod
def get_defaults(user_id: int | None = None) -> Dict[str, Any]:
return {
"id": user_id,
"custom_role": None,
"custom_channel": None,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in HoloUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in HoloUser")
return HoloUser.get_defaults()[key]
def _to_dict(self) -> Dict[str, Any]:
return {
"_id": self._id,
"id": self.id,
"custom_role": self.custom_role,
"custom_channel": self.custom_channel,
}
async def set_custom_channel(
self, channel_id: int, cache: Optional[Cache] = None
) -> None:
await self._set("custom_channel", channel_id, cache=cache)
async def set_custom_role(
self, role_id: int, cache: Optional[Cache] = None
) -> None:
await self._set("custom_role", role_id, cache=cache)
async def remove_custom_channel(self, cache: Optional[Cache] = None) -> None:
await self._remove("custom_channel", cache=cache)
async def remove_custom_role(self, cache: Optional[Cache] = None) -> None:
await self._remove("custom_role", cache=cache)
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove user data from database. Only removes the user record from users collection.
### Args:
* `cache` (Cache, optional): Cache engine to write the update into
"""
await col_users.delete_one({"_id": self._id})
self._delete_cache(cache)
@staticmethod
async def is_moderator(member: User | Member) -> bool:
async def is_moderator(
self, member: Union[discord.User, discord.Member, discord.member.Member]
) -> bool:
"""Check if user is moderator or council member
### Args:
* `member` (User | Member): Member object
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
### Returns:
`bool`: `True` if member is a moderator or member of council and `False` if not
"""
if isinstance(member, User):
if isinstance(member, discord.User):
return False
moderator_role: int | None = await config_get("moderators", "roles")
council_role: int | None = await config_get("council", "roles")
moderator_role = await config_get("moderators", "roles")
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id in (moderator_role, council_role):
if role.id == moderator_role or role.id == council_role:
return True
return False
@staticmethod
async def is_council(member: User | Member) -> bool:
async def is_council(
self, member: Union[discord.User, discord.Member, discord.member.Member]
) -> bool:
"""Check if user is a member of council
### Args:
* `member` (User | Member): Member object
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
### Returns:
`bool`: `True` if member is a member of council and `False` if not
"""
if isinstance(member, User):
if isinstance(member, discord.User):
return False
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == council_role:
return True
return False
# def purge(self) -> None:
# """Completely remove data from database. Will not remove transactions logs and warnings."""
# col_users.delete_one(filter={"_id": self.db_id})
# self.unauthorize()

View File

@@ -1,39 +1,108 @@
import logging
import sys
from logging import Logger
from discord import (
ApplicationContext,
Embed,
User,
option,
slash_command,
Role,
TextChannel,
)
from discord import ApplicationContext, Embed, User, option, slash_command
from discord import utils as ds_utils
from discord.ext import commands
from libbot.utils import config_get
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from enums import Color
from modules.scheduler import scheduler
from modules.utils_sync import guild_name
from modules.waifu_pics import waifu_pics
from enums.colors import Color
from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
logger: Logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class Admin(commands.Cog):
"""Cog with utility commands for admins."""
def __init__(self, client: PycordBot):
self.client = client
def __init__(self, client: HoloBot):
self.client: HoloBot = client
# @slash_command(
# name="warning",
# description="Попередити юзера про порушення правил",
# guild_ids=[config_get_sync("guild")],
# )
# @option("user", description="Користувач")
# @option("reason", description="Причина")
# async def warn_cmd(
# self,
# ctx: ApplicationContext,
# user: User,
# reason: str = "Не вказана",
# ):
# logging.info(f"User {ctx.user.id} warned {user.id} for {reason}")
# await ctx.defer()
# jav_user = HoloUser(user)
# if ctx.user.id in await config_get("admins"):
# logging.info(
# f"Moderator {guild_name(ctx.user)} warned {guild_name(user)} for {reason} (has {jav_user.warnings} warns)"
# )
# if jav_user.warnings >= 5:
# logging.info(
# f"User {guild_name(user)} was banned due to a big amount of warns ({jav_user.warnings})"
# )
# await user.send(
# embed=Embed(
# title="Перманентне блокування",
# description=f"Вас було заблоковано за неодноразове порушення правил сервера.",
# color=Color.fail,
# )
# )
# await user.ban(reason=reason)
# elif jav_user.warnings >= 2:
# logging.info(
# f"User {guild_name(user)} was muted due to a big amount of warns ({jav_user.warnings})"
# )
# jav_user.warn(reason=reason)
# await user.send(
# embed=Embed(
# title="Тимчасове блокування",
# description=f"Причина: `{reason}`\n\nНа вашому рахунку вже {jav_user.warnings} попереджень. Вас було тимчасово заблоковано на **1 годину**.\n\nЯкщо Ви продовжите порушувати правила сервера згодом Вас заблокують.",
# color=0xDED56B,
# )
# )
# await user.timeout_for(timedelta(hours=1), reason=reason)
# else:
# jav_user.warn()
# await ctx.respond(
# embed=Embed(
# title="Попередження",
# description=f"{user.mention} Будь ласка, не порушуйте правила. Ви отримали попередження з причини `{reason}`.\n\nЯкщо Ви продовжите порушувати правила це може призвести до блокування в спільноті.",
# color=0xDED56B,
# )
# )
# else:
# logging.warning(
# f"User {guild_name(ctx.user)} tried to use /warn but permission denied"
# )
# await ctx.respond(
# embed=Embed(
# title="Відмовлено в доступі",
# description="Здається, це команда лише для модераторів",
# color=Color.fail,
# )
# )
# mod_role = ds_utils.get(
# ctx.user.guild.roles, id=await config_get("moderators", "roles")
# )
# admin_chan = ds_utils.get(
# ctx.user.guild.channels,
# id=await config_get("adminchat", "channels", "text"),
# )
# await admin_chan.send(
# content=f"{mod_role.mention}",
# embed=Embed(
# title="Неавторизований запит",
# description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
# color=Color.fail,
# ),
# )
@slash_command(
name="clear",
description="Видалити деяку кількість повідомлень в каналі",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
@option("amount", description="Кількість")
@option("user", description="Користувач", default=None)
@@ -42,123 +111,97 @@ class Admin(commands.Cog):
ctx: ApplicationContext,
amount: int,
user: User,
) -> None:
"""Command /clear <amount> [<user>]
Removes last <amount> messages in the current channel. Optionally from a specific user.
"""
if ctx.user.id in self.client.owner_ids:
):
if ctx.user.id in self.client.config["bot"]["owners"]:
logging.info(
"User %s removed %s message(s) in %s",
ctx.user.id,
amount,
ctx.channel.id,
)
await ctx.respond(
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
)
if user is None:
if user == None:
await ctx.channel.purge(limit=amount)
else:
await ctx.channel.purge(
limit=amount, check=lambda msg: msg.author == user
)
return
else:
logging.warning(
"User %s tried to use /clear but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
color=Color.fail,
)
)
mod_role: Role | None = ds_utils.get(
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
color=Color.fail,
),
)
@slash_command(
name="reboot",
description="Перезапустити бота",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
async def reboot_cmd(self, ctx: ApplicationContext) -> None:
"""Command /reboot
Stops the bot. Is called "reboot" because it's assumed that the bot has automatic restart.
"""
async def reboot_cmd(self, ctx: ApplicationContext):
await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids:
if ctx.user.id in self.client.config["bot"]["owners"]:
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
await ctx.respond(
embed=Embed(
title="Вимикаюсь...",
description="Спробую перезавантажитись за 5 секунд",
)
)
scheduler.shutdown()
await self.client.close()
await waifu_pics._client_session.close()
sys.exit()
exit()
else:
logging.warning(
"User %s tried to use /reboot but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
color=Color.fail,
)
)
mod_role: Role | None = ds_utils.get(
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
color=Color.fail,
),
)
def setup(client: HoloBot) -> None:
def setup(client: PycordBot):
client.add_cog(Admin(client))

View File

@@ -1,31 +1,26 @@
import logging
from logging import Logger
from typing import Dict, List, Any
from discord import Cog, Message
from discord.ext import commands
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from modules.database import col_analytics
logger: Logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class Analytics(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
def __init__(self, client: PycordBot):
self.client = client
@Cog.listener()
async def on_message(self, message: Message) -> None:
"""Listener that collects analytical data (stickers, attachments, messages)."""
async def on_message(self, message: Message):
if (
(message.author != self.client.user)
and (message.author.bot is False)
and (message.author.system is False)
and (message.author.bot == False)
and (message.author.system == False)
):
# Handle stickers
stickers: List[Dict[str, Any]] = []
stickers = []
for sticker in message.stickers:
stickers.append(
{
@@ -36,9 +31,7 @@ class Analytics(commands.Cog):
}
)
# Handle attachments
attachments: List[Dict[str, Any]] = []
attachments = []
for attachment in message.attachments:
attachments.append(
{
@@ -53,10 +46,9 @@ class Analytics(commands.Cog):
}
)
# Insert entry into the database
await col_analytics.insert_one(
col_analytics.insert_one(
{
"user_id": message.author.id,
"user": message.author.id,
"channel": message.channel.id,
"content": message.content,
"stickers": stickers,
@@ -65,5 +57,5 @@ class Analytics(commands.Cog):
)
def setup(client: HoloBot) -> None:
def setup(client: PycordBot):
client.add_cog(Analytics(client))

View File

@@ -1,91 +1,52 @@
import logging
from logging import Logger
from typing import Any, Dict
from discord import ApplicationContext, Embed, option, TextChannel, Role
from discord import ApplicationContext, Embed, option
from discord import utils as ds_utils
from discord.abc import GuildChannel
from discord.commands import SlashCommandGroup
from discord.ext import commands
from libbot.utils import config_get
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser
from enums import Color
from enums.colors import Color
from modules.database import col_users
from modules.utils_sync import guild_name
logger: Logger = logging.getLogger(__name__)
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
class CustomChannels(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
def __init__(self, client: PycordBot):
self.client = client
@commands.Cog.listener()
async def on_guild_channel_delete(self, channel: GuildChannel) -> None:
await col_users.find_one_and_update(
{"custom_channel": channel.id}, {"$set": {"custom_channel": None}}
async def on_guild_channel_delete(self, channel: GuildChannel):
col_users.find_one_and_update(
{"customchannel": channel.id}, {"$set": {"customchannel": None}}
)
custom_channel_group: SlashCommandGroup = SlashCommandGroup(
"customchannel", "Керування особистим каналом"
)
customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом")
@custom_channel_group.command(
@customchannel.command(
name="get",
description="Отримати персональний текстовий канал",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки")
async def custom_channel_get_cmd(
async def customchannel_get_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
) -> None:
"""Command /customchannel get <name> <reactions> <threads>
Command to create a custom channel for a user.
"""
holo_user_ctx: HoloUser = await HoloUser.from_user(
ctx.user, cache=self.client.cache
)
# Return if the user is using the command outside of a guild
if not hasattr(ctx.author, "guild"):
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="Виконання за межами сервера не є можливим.",
color=Color.FAIL,
)
)
return
# Return if the user already has a custom channel
if holo_user_ctx.custom_channel is not None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.FAIL,
)
)
return
):
holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel == None:
await ctx.defer()
created_channel: TextChannel = await ctx.user.guild.create_text_channel(
created_channel = await ctx.user.guild.create_text_channel(
name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get(
ctx.author.guild.categories,
id=await config_get("custom_channels", "categories"),
id=await config_get("customchannels", "categories"),
),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
@@ -101,64 +62,55 @@ class CustomChannels(commands.Cog):
embed_links=True,
manage_channels=True,
)
await holo_user_ctx.set_custom_channel(
created_channel.id, cache=self.client.cache
)
holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond(
embed=Embed(
title="Створено канал",
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.SUCCESS,
color=Color.success,
)
)
bots: Dict[str, Any] = await config_get("bots")
bots = await config_get("bots")
for bot in bots:
role: Role | None = ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"])
if role is not None:
await created_channel.set_permissions(
role,
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
else:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.fail,
)
)
@custom_channel_group.command(
@customchannel.command(
name="edit",
description="Змінити параметри особистого каналу",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки")
async def custom_channel_edit_cmd(
async def customchannel_edit_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
) -> None:
"""Command /customchannel edit <name> <reactions> <threads>
):
holo_user_ctx = HoloUser(ctx.user)
Command to change properties of a custom channel.
"""
holo_user_ctx: HoloUser = await HoloUser.from_user(
ctx.user, cache=self.client.cache
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.custom_channel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.FAIL,
color=Color.fail,
)
)
return
await custom_channel.edit(name=name)
await custom_channel.set_permissions(
ctx.user.guild.default_role,
@@ -167,89 +119,68 @@ class CustomChannels(commands.Cog):
create_public_threads=threads,
create_private_threads=threads,
)
await ctx.respond(
embed=Embed(
title="Канал змінено",
description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`",
color=Color.FAIL,
color=Color.fail,
)
)
@custom_channel_group.command(
@customchannel.command(
name="remove",
description="Відібрати канал, знищуючи його, та частково повернути кошти",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
@option("confirm", description="Підтвердження операції")
async def custom_channel_remove_cmd(
async def customchannel_remove_cmd(
self, ctx: ApplicationContext, confirm: bool = False
) -> None:
"""Command /customchannel remove [<confirm>]
Command to remove a custom channel. Requires additional confirmation."""
holo_user_ctx: HoloUser = await HoloUser.from_user(
ctx.user, cache=self.client.cache
)
# Return if the user does not have a custom channel
if holo_user_ctx.custom_channel is None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас немає особистого каналу.",
color=Color.FAIL,
)
)
return
):
holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel is not None:
await ctx.defer()
custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.custom_channel
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.FAIL,
color=Color.fail,
)
)
await holo_user_ctx.remove_custom_channel(cache=self.client.cache)
holo_user_ctx.set("customchannel", None)
return
# Return if the confirmation is missing
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.FAIL,
color=Color.fail,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
await holo_user_ctx.remove_custom_channel(cache=self.client.cache)
try:
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.DEFAULT,
color=Color.default,
)
)
except Exception as exc:
logger.warning(
"Could not send a custom channel removal confirmation due to: %s", exc
else:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас немає особистого каналу.",
color=Color.fail,
)
)
def setup(client: HoloBot) -> None:
def setup(client: PycordBot):
client.add_cog(CustomChannels(client))

View File

@@ -1,160 +1,109 @@
import logging
from logging import Logger
from os import makedirs
from pathlib import Path
from typing import List, Dict, Any
from uuid import uuid4
from discord import ApplicationContext, Embed, File, option, Role, TextChannel
from discord import ApplicationContext, Embed, File, option
from discord import utils as ds_utils
from discord.commands import SlashCommandGroup
from discord.ext import commands
from libbot.utils import config_get, json_write
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser
from enums import Color
from modules.utils_sync import guild_name
from enums.colors import Color
from modules.database import col_users
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
logger: Logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class Data(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
def __init__(self, client: PycordBot):
self.client = client
data: SlashCommandGroup = SlashCommandGroup("data", "Керування даними користувачів")
data = SlashCommandGroup("data", "Керування даними користувачів")
@data.command(
name="export",
description="Експортувати дані",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None:
"""Command /data export <kind>
Command to export specific kind of data."""
async def data_export_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer()
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await HoloUser.is_council(ctx.author)
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.config["bot"]["owners"]) or (
await holo_user.is_council(ctx.author)
):
logging.info(
"User %s tried to use /export but permission denied",
guild_name(ctx.user),
"Moderator %s exported current users list", guild_name(ctx.user)
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
)
)
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
),
)
return
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
makedirs("tmp", exist_ok=True)
uuid: str = str(uuid4())
uuid = str(uuid4())
if kind == "Користувачі":
users: List[Dict[str, Any]] = []
users = []
for member in ctx.guild.members:
users.append(
{
"id": member.id,
"nick": member.nick,
"username": member.name,
"username": f"{member.name}#{member.discriminator}",
"bot": member.bot,
}
)
# Temporary file must be written synchronously,
# otherwise it will not be there when ctx.respond() is be called
# TODO Find a way to give this file to Pycord without FS operations
json_write(users, Path(f"tmp/{uuid}"))
json_write_sync(users, str(Path(f"tmp/{uuid}")))
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
# TODO Deprecate this command
@data.command(
name="migrate",
description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get("guild")],
await ctx.respond(
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None:
"""Command /migrate <kind>
Command to migrate specific kind of data.
Migration of users in this case means creation of their DB entries."""
await ctx.defer()
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await HoloUser.is_council(ctx.author)
):
else:
logging.info(
"User %s tried to use /migrate but permission denied",
"User %s tried to use /export but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
color=Color.fail,
)
)
mod_role: Role | None = ds_utils.get(
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
color=Color.fail,
),
)
return
@data.command(
name="migrate",
description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get_sync("guild")],
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.config["bot"]["owners"]) or (
await holo_user.is_council(ctx.author)
):
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
@@ -164,17 +113,55 @@ class Data(commands.Cog):
for member in ctx.guild.members:
if member.bot:
continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
await HoloUser.from_user(member, cache=self.client.cache)
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
await ctx.respond(
embed=Embed(
title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.SUCCESS,
color=Color.success,
)
)
else:
logging.info(
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
def setup(client: HoloBot) -> None:
def setup(client: PycordBot):
client.add_cog(Data(client))

View File

@@ -1,42 +1,40 @@
import logging
from logging import Logger
from discord import ApplicationContext, Embed, User, option, slash_command
from discord.ext import commands
from libbot.utils import config_get
from libbot.pycord.classes import PycordBot
from WaifuPicsPython import WaifuAsync
from classes.holo_bot import HoloBot
from modules.utils_sync import guild_name
from modules.waifu_pics import waifu_pics
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
logger: Logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
wafiu_pics = WaifuAsync()
class Fun(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
def __init__(self, client: PycordBot):
self.client = client
@slash_command(
name="action",
description="Провести над користувачем РП дію",
guild_ids=[config_get("guild")],
guild_ids=[config_get_sync("guild")],
)
@option(
"type",
description="Тип дії, яку хочете провести з користувачем",
choices=config_get("actions").keys(),
choices=config_get_sync("actions").keys(),
)
@option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None:
"""Command /action <type> <user>
Command to perform some RP action on a user and send them a GIF."""
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User):
await ctx.defer()
action: str = await config_get("category", "actions", type)
action_verb: str = await config_get("action", "actions", type)
action = await config_get("category", "actions", type)
action_verb = await config_get("action", "actions", type)
image_url: str = await waifu_pics.sfw(action)
image = await wafiu_pics.sfw(action)
logger.info(
"User %s (%s) %s %s (%s) with image %s",
@@ -45,17 +43,17 @@ class Fun(commands.Cog):
action_verb,
guild_name(user),
user.id,
image_url,
image,
)
embed: Embed = Embed(
embed = Embed(
description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**",
color=0x2F3136,
)
embed.set_image(url=image_url)
embed.set_image(url=image)
await ctx.respond(embed=embed)
def setup(client: HoloBot) -> None:
def setup(client: PycordBot):
client.add_cog(Fun(client))

View File

@@ -1,81 +1,67 @@
import logging
from logging import Logger
from discord import Member, Message, TextChannel, MessageType
from discord import Member, Message
from discord import utils as ds_utils
from discord.ext import commands
from libbot.utils import config_get
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser
from modules.database import col_users
logger: Logger = logging.getLogger(__name__)
from modules.utils import config_get
class Logger(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
def __init__(self, client: PycordBot):
self.client = client
@commands.Cog.listener()
async def on_message(self, message: Message):
"""Message listener. All actions on messages remain here for now."""
if (
(message.author != self.client.user)
and (message.author.bot is False)
and (message.author.system is False)
and (message.author.bot == False)
and (message.author.system == False)
):
await HoloUser.from_user(message.author, cache=self.client.cache)
if col_users.find_one({"user": message.author.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
if (
(message.type == MessageType.thread_created)
and (message.channel is not None)
and (
await col_users.count_documents({"custom_channel": message.channel.id})
> 0
)
):
try:
logger.info(
"Deleting the thread creation message in a custom channel %s",
message.channel.id,
)
await message.delete()
except Exception as exc:
logger.warning(
"Could not delete the thread creation message in a custom channel %s due to %s",
message.channel.id,
exc,
)
user["user"] = message.author.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
@commands.Cog.listener()
async def on_member_join(self, member: Member) -> None:
"""Member join handler. All actions on member join remain here for now."""
welcome_chan: TextChannel | None = ds_utils.get(
async def on_member_join(self, member: Member):
welcome_chan = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels,
id=await config_get("welcome", "channels", "text"),
)
rules_chan: TextChannel | None = ds_utils.get(
rules_chan = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels,
id=await config_get("rules", "channels", "text"),
)
if (
(member != self.client.user)
and (member.bot is False)
and (member.system is False)
and (member.bot == False)
and (member.system == False)
):
if welcome_chan is not None and rules_chan is not None:
await welcome_chan.send(
content=(await config_get("welcome", "messages")).format(
mention=member.mention, rules=rules_chan.mention
)
)
else:
logger.warning("Could not find a welcome and/or rules channel by id")
await HoloUser.from_user(member, cache=self.client.cache)
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
def setup(client: HoloBot) -> None:
def setup(client: PycordBot):
client.add_cog(Logger(client))

View File

@@ -1,58 +0,0 @@
import logging
from logging import Logger
from discord import Activity, ActivityType
from discord.ext import commands
from libbot.utils import config_get
from classes.holo_bot import HoloBot
logger: Logger = logging.getLogger(__name__)
class Utility(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Listener for the event when bot connects to Discord and becomes "ready"."""
logger.info("Logged in as %s", self.client.user)
activity_type: str = await config_get("type", "status")
activity_message: str = await config_get("message", "status")
if activity_type == "playing":
await self.client.change_presence(
activity=Activity(type=ActivityType.playing, name=activity_message)
)
elif activity_type == "watching":
await self.client.change_presence(
activity=Activity(type=ActivityType.watching, name=activity_message)
)
elif activity_type == "listening":
await self.client.change_presence(
activity=Activity(type=ActivityType.listening, name=activity_message)
)
elif activity_type == "streaming":
await self.client.change_presence(
activity=Activity(type=ActivityType.streaming, name=activity_message)
)
elif activity_type == "competing":
await self.client.change_presence(
activity=Activity(type=ActivityType.competing, name=activity_message)
)
elif activity_type == "custom":
await self.client.change_presence(
activity=Activity(type=ActivityType.custom, name=activity_message)
)
else:
return
logger.info(
"Set activity type to %s with message %s", activity_type, activity_message
)
def setup(client: HoloBot) -> None:
client.add_cog(Utility(client))

View File

@@ -22,24 +22,18 @@
"port": 27017,
"name": "holo_discord"
},
"cache": {
"type": null,
"memcached": {
"uri": "127.0.0.1:11211",
"prefix": null
},
"redis": {
"uri": "redis://127.0.0.1:6379/0",
"prefix": null
}
},
"logging": {
"size": 512,
"location": "logs"
},
"defaults": {},
"defaults": {
"user": {
"customrole": null,
"customchannel": null
}
},
"categories": {
"custom_channels": 0
"customchannels": 0
},
"channels": {
"text": {

View File

@@ -1 +0,0 @@
from .colors import Color

View File

@@ -1,7 +1,6 @@
from enum import IntEnum
class Color(IntEnum):
FAIL = 0xD6345B
SUCCESS = 0x84D961
DEFAULT = 0xA7A6AB
fail = 0xd6345b
success = 0x84d961
default = 0xa7a6ab

View File

@@ -1 +0,0 @@
from .user import UserNotFoundError

View File

@@ -1,10 +0,0 @@
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)

101
main.py
View File

@@ -1,71 +1,76 @@
import contextlib
import logging
import sys
from argparse import ArgumentParser
from logging import Logger
from pathlib import Path
from discord import LoginFailure, Intents
from libbot.utils import config_get
from discord import Activity, ActivityType
from classes.holo_bot import HoloBot
from modules.migrator import migrate_database
from modules.scheduler import scheduler
if not Path("config.json").exists():
print(
"Config file is missing: Make sure the configuration file 'config.json' is in place.",
flush=True,
)
sys.exit()
from modules.client import client
from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync
logging.basicConfig(
level=logging.INFO if not config_get("debug") else logging.DEBUG,
level=logging.INFO,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]",
)
logger: Logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
# Declare the parser that retrieves the command line arguments
parser = ArgumentParser(
prog="HoloUA Discord",
description="Discord bot for the HoloUA community.",
)
# Add a switch argument --migrate to be parsed...
parser.add_argument("--migrate", action="store_true")
# ...and parse the arguments we added
args = parser.parse_args()
# Try to import the module that improves performance
# and ignore errors when module is not installed
with contextlib.suppress(ImportError):
import uvloop
try:
import uvloop # type: ignore
uvloop.install()
except ImportError:
pass
def main() -> None:
# Perform migration if command line argument was provided
if args.migrate:
logger.info("Performing migrations...")
migrate_database()
@client.event
async def on_ready():
logger.info("Logged in as %s", client.user)
intents: Intents = Intents().all()
client: HoloBot = HoloBot(intents=intents, scheduler=scheduler)
activity_type = await config_get("type", "status")
activity_message = await config_get("message", "status")
if activity_type == "playing":
await client.change_presence(
activity=Activity(type=ActivityType.playing, name=activity_message)
)
elif activity_type == "watching":
await client.change_presence(
activity=Activity(type=ActivityType.watching, name=activity_message)
)
elif activity_type == "listening":
await client.change_presence(
activity=Activity(type=ActivityType.listening, name=activity_message)
)
elif activity_type == "streaming":
await client.change_presence(
activity=Activity(type=ActivityType.streaming, name=activity_message)
)
elif activity_type == "competing":
await client.change_presence(
activity=Activity(type=ActivityType.competing, name=activity_message)
)
elif activity_type == "custom":
await client.change_presence(
activity=Activity(type=ActivityType.custom, name=activity_message)
)
else:
return
logger.info(
"Set activity type to %s with message %s", activity_type, activity_message
)
def main():
client.load_extension("cogs")
try:
client.run(config_get("bot_token", "bot"))
except LoginFailure as exc:
logger.error("Provided bot token is invalid: %s", exc)
scheduler.start()
client.run(config_get_sync("token"))
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received: Shutting down gracefully.")
finally:
sys.exit()
scheduler.shutdown()
exit()
if __name__ == "__main__":

View File

@@ -1,79 +0,0 @@
import logging
from logging import Logger
from libbot.utils import config_get, config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
# Categories
config_set(
"custom_channels",
config_get("customchannels", "categories"),
"categories",
)
config_delete("customchannels", "categories")
# User defaults
config_delete(
"user",
"defaults",
)
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"customchannel": {"$exists": True}},
{"$rename": {"customchannel": "custom_channel"}},
)
self.db.users.update_many(
{"customrole": {"$exists": True}},
{"$rename": {"customrole": "custom_role"}},
)
def downgrade(self):
try:
# Categories
config_set(
"customchannels",
config_get("custom_channels", "categories"),
"categories",
)
config_delete("custom_channels", "categories")
# User defaults
config_set(
"customrole",
None,
"defaults",
"user",
)
config_set(
"customchannel",
None,
"defaults",
"user",
)
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"custom_channel": {"$exists": True}},
{"$rename": {"custom_channel": "customchannel"}},
)
self.db.users.update_many(
{"custom_role": {"$exists": True}},
{"$rename": {"custom_role": "customrole"}},
)

View File

@@ -1,63 +0,0 @@
import logging
from logging import Logger
from libbot.utils import config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
config_set(
"cache",
{
"type": None,
"memcached": {"uri": "127.0.0.1:11211"},
"redis": {"uri": "redis://127.0.0.1:6379/0"},
},
*[],
)
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "id"}},
)
self.db.analytics.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "user_id"}},
)
self.db.warnings.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "user_id"}},
)
def downgrade(self):
try:
config_delete("cache", *[])
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"id": {"$exists": True}},
{"$rename": {"id": "user"}},
)
self.db.analytics.update_many(
{"user": {"$exists": True}},
{"$rename": {"user_id": "user"}},
)
self.db.warnings.update_many(
{"user": {"$exists": True}},
{"$rename": {"user_id": "user"}},
)

View File

@@ -1,31 +0,0 @@
import logging
from logging import Logger
from libbot.utils import config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
config_set("prefix", None, "cache", "memcached")
config_set("prefix", None, "cache", "redis")
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
def downgrade(self):
try:
config_delete("prefix", "cache", "redis")
config_delete("prefix", "cache", "memcached")
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)

6
modules/client.py Normal file
View File

@@ -0,0 +1,6 @@
from discord import Intents
from libbot.pycord.classes import PycordBot
intents = Intents().all()
intents.members = True
client = PycordBot(intents=intents)

View File

@@ -1,16 +1,12 @@
from typing import Dict, Any
from pymongo import MongoClient
from ujson import loads
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase
from libbot.utils import config_get
with open("config.json", "r", encoding="utf-8") as f:
db_config = loads(f.read())["database"]
f.close()
db_config: Dict[str, Any] = config_get("database")
con_string: str = (
"mongodb://{0}:{1}/{2}".format(
db_config["host"], db_config["port"], db_config["name"]
)
if db_config["user"] is None or db_config["password"] is None
else "mongodb://{0}:{1}@{2}:{3}/{4}".format(
db_client = MongoClient(
"mongodb://{0}:{1}@{2}:{3}/{4}".format(
db_config["user"],
db_config["password"],
db_config["host"],
@@ -18,14 +14,14 @@ con_string: str = (
db_config["name"],
)
)
db = db_client.get_database(name=db_config["name"])
db_client: AsyncClient = AsyncClient(con_string)
collections = db.list_collection_names()
# Async declarations per default
db: AsyncDatabase = db_client.get_database(name=db_config["name"])
for collection in ["users", "warnings", "scheduler", "analytics"]:
if not collection in collections:
db.create_collection(collection)
col_users: AsyncCollection = db.get_collection("users")
col_analytics: AsyncCollection = db.get_collection("analytics")
# Update indexes
db.dispatch.get_collection("users").create_index("id", unique=True)
col_users = db.get_collection("users")
col_warnings = db.get_collection("warnings")
col_analytics = db.get_collection("analytics")

View File

@@ -1,22 +0,0 @@
from typing import Any, Mapping
from libbot.utils import config_get
from mongodb_migrations.cli import MigrationManager
from mongodb_migrations.config import Configuration
def migrate_database() -> None:
"""Apply migrations from folder `migrations/` to the database"""
database_config: Mapping[str, Any] = config_get("database")
manager_config = Configuration(
{
"mongo_host": database_config["host"],
"mongo_port": database_config["port"],
"mongo_database": database_config["name"],
"mongo_username": database_config["user"],
"mongo_password": database_config["password"],
}
)
manager = MigrationManager(manager_config)
manager.run()

View File

@@ -1,3 +1,3 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler: AsyncIOScheduler = AsyncIOScheduler()
scheduler = AsyncIOScheduler()

38
modules/utils.py Normal file
View File

@@ -0,0 +1,38 @@
from typing import Any
import aiofiles
from ujson import dumps, loads
async def json_read(path: str) -> Any:
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: str) -> None:
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
)
async def config_get(key: str, *path: str) -> Any:
this_key = await json_read("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(key: str, value: Any, *path: str) -> None:
this_dict = await json_read("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
await json_write(this_dict, "config.json")
return

View File

@@ -1,8 +1,45 @@
from typing import Any, Union
from discord import Member, User
from ujson import dumps, loads
def guild_name(member: Member | User) -> str:
def json_read_sync(path: str) -> Any:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Union[Member, User]) -> str:
if isinstance(member, User):
return member.name
return member.name if member.nick is None else member.nick
if member.nick == None:
return member.name
else:
return member.nick

View File

@@ -1,3 +0,0 @@
from WaifuPicsPython import WaifuAsync
waifu_pics: WaifuAsync = WaifuAsync()

View File

@@ -1,13 +1,8 @@
# Waifu pics related dependencies (not listed directly by waifupics)
aiohttp>=3.10.0
requests>=2.32.2
aiofiles~=24.1.0
apscheduler>=3.10.0
async_pymongo==0.1.11
libbot[speed,pycord,cache]==4.4.0
mongodb-migrations==1.3.1
pymemcache~=4.0.0
redis~=6.4.0
ujson~=5.11.0
aiofiles==23.2.1
apscheduler==3.10.4
pymongo~=4.7.3
requests~=2.32.3
ujson~=5.10.0
WaifuPicsPython==0.2.0
--extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
libbot[speed,pycord]==3.2.2

View File

@@ -1,14 +1,14 @@
{
"$jsonSchema": {
"required": [
"user_id",
"user",
"channel",
"content",
"stickers",
"attachments"
],
"properties": {
"user_id": {
"user": {
"bsonType": "long",
"description": "Discord ID of user"
},
@@ -17,10 +17,7 @@
"description": "Discord ID of a channel"
},
"content": {
"bsonType": [
"null",
"string"
],
"bsonType": ["null", "string"],
"description": "Text of the message"
},
"stickers": {
@@ -43,7 +40,7 @@
"format": {
"bsonType": "array"
},
"user_id": {
"user": {
"bsonType": "string"
}
}

View File

@@ -1,27 +1,22 @@
{
"$jsonSchema": {
"required": [
"id",
"custom_role",
"custom_channel"
"user",
"customrole",
"customchannel"
],
"properties": {
"id": {
"user": {
"bsonType": "long",
"description": "Discord ID of user"
},
"custom_role": {
"bsonType": [
"null",
"long"
],
"customrole": {
"bsonType": ["null", "long"],
"description": "Discord ID of custom role or 'null' if not set"
},
"custom_channel": {
"bsonType": [
"null",
"long"
],
"customchannel": {
"bsonType": ["null", "long"],
"description": "Discord ID of custom channel or 'null' if not set"
}
}

18
validation/warnings.json Normal file
View File

@@ -0,0 +1,18 @@
{
"$jsonSchema": {
"required": [
"user",
"warns"
],
"properties": {
"user": {
"bsonType": "long",
"description": "Discord ID of user"
},
"warns": {
"bsonType": "int",
"description": "Number of warnings on count"
}
}
}
}