2 Commits

Author SHA1 Message Date
4afcbc93d5 Merge pull request 'Fixed minor issues' (#6) from dev into main
Reviewed-on: #6
2024-06-23 13:14:08 +03:00
72ccaa04a4 Merge pull request 'v0.1.0-rc.1' (#5) from dev into main
Reviewed-on: #5
2024-06-23 13:06:12 +03:00
33 changed files with 685 additions and 1233 deletions

View File

@@ -1,40 +0,0 @@
# Include any files or directories that you don't want to be copied to your
# container here (e.g., local build artifacts, temporary files, etc.).
#
# For more help, visit the .dockerignore file reference guide at
# https://docs.docker.com/go/build-context-dockerignore/
**/.DS_Store
**/__pycache__
**/.venv
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/bin
**/charts
**/docker-compose*
**/compose.y*ml
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md
config.json
.renovaterc
**/.idea
**/.mypy_cache
validation

View File

@@ -1,27 +0,0 @@
ARG PYTHON_VERSION=3.12.8
FROM python:${PYTHON_VERSION}-slim AS base
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
ARG UID=10001
RUN adduser \
--disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
appuser
RUN --mount=type=cache,target=/root/.cache/pip \
--mount=type=bind,source=requirements.txt,target=requirements.txt \
python -m pip install -r requirements.txt
USER appuser
COPY . .
ENTRYPOINT ["python", "main.py", "--migrate"]

121
README.md
View File

@@ -1,121 +1,2 @@
<h1 align="center">HoloBot Discord</h1> # HoloBotDiscord
<p align="center">Small Discord bot made on Py-Cord</p>
<p align="center">
<a href="https://git.end-play.xyz/HoloUA/Discord/src/branch/master/LICENSE"><img alt="License: GPL" src="https://img.shields.io/badge/License-GPL-blue"></a>
<a href="https://git.end-play.xyz/HoloUA/Discord"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
</p>
## Installation from release
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Install Python 3.11+
3. Download the [latest release](https://git.end-play.xyz/HoloUA/Discord/releases/latest)'s archive
4. Extract the archive
5. Navigate to the extracted folder and subfolder `Discord` in it
6. Create a virtual environment:
`python -m venv .venv` or `virtualenv .venv`
7. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
8. Install the dependencies:
`python -m pip install -r requirements.txt`
9. Run the bot with `python main.py` after completing the [configuration](#Configuration)
## Installation with Git
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Install Python 3.11+
3. Clone the repository:
`git clone https://git.end-play.xyz/HoloUA/Discord.git`
4. `cd Discord`
5. Create a virtual environment:
`python -m venv .venv` or `virtualenv .venv`
6. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
7. Install the dependencies:
`python -m pip install -r requirements.txt`
8. Run the bot with `python main.py` after completing the [configuration](#Configuration)
## Upgrading with Git
1. Go to the bot's directory
2. `git pull`
3. Activate the virtual environment:
Windows: `.venv\Scripts\activate.bat`
Linux/macOS: `.venv/bin/activate`
4. Update the dependencies:
`python -m pip install -r requirements.txt`
5. First start after the upgrade must initiate the migration:
`python main.py --migrate`
6. Now the bot is up to date and the next run will not require `--migrate` anymore
## Configuration
There's a file `config_example.json` which contains default configuration
and should be used as a base config.
Copy this file to `config.json` and open it with any text editor of your liking.
Modify the newly created configuration file to fit your needs.
Mandatory keys to modify:
- guild
- bot.owner
- bot.bot_token
- database.*
- categories.*
- channels.*
- roles.*
After all of that you're good to go! Happy using :)
## Caching
Although general database access speed is fast, caching might become helpful for
bigger servers with many bot interactions. Currently, Redis and Memcached are supported.
Configuration happens through the config key `caching`.
Set `caching.type` to the service of you choice ("redis" or "memcached") and then update
the URI to access the service. It's Redis' default URI format for Redis and "address:port"
for Memcached.
Which one should I choose?
| Service | Read/write speed | Config flexibility |
|-----------|------------------|--------------------|
| Redis | High | Very flexible |
| Memcached | Very high | Basic |
> Performance difference between Redis and Memcached is generally quite low, so your setup
> should normally depend more on the configuration flexibility than on raw speed.
## Debugging
There's a config key `debug` that can be set to `true` for debugging purposes.
It should be set to `false` in production, otherwise log becomes very verbose and might
contain data that shouldn't normally be logged.
## Docker [Experimental]
As an experiment, Docker deployment option has been added.
### Building the image
1. `git clone https://git.end-play.xyz/HoloUA/Discord.git`
2. `cd Discord`
3. `docker build -t holoua-discord .`
### Starting the bot
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation)
2. Download
the [configuration example file](https://git.end-play.xyz/HoloUA/Discord/src/branch/main/config_example.json) and
store it somewhere you would like your bot to access it
3. Complete the [configuration](#Configuration) step for this file
4. `docker run -d -v /path/to/config.json:/app/config.json holoua-discord`

View File

View File

@@ -1,28 +0,0 @@
import logging
from logging import Logger
from typing import Literal
from libbot.cache.classes import CacheMemcached, CacheRedis
from libbot.cache.manager import create_cache_client
from libbot.pycord.classes import PycordBot
logger: Logger = logging.getLogger(__name__)
class HoloBot(PycordBot):
cache: CacheMemcached | CacheRedis | None = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._set_cache_engine()
def _set_cache_engine(self) -> None:
cache_type: Literal["redis", "memcached"] | None = self.config["cache"]["type"]
if "cache" in self.config and cache_type is not None:
self.cache = create_cache_client(
self.config,
cache_type,
prefix=self.config["cache"][cache_type]["prefix"],
default_ttl_seconds=3600,
)

View File

@@ -1,227 +1,144 @@
import logging import logging
from logging import Logger from typing import Any, Union
from typing import Any, Dict, Optional
from bson import ObjectId import discord
from discord import User, Member import discord.member
from libbot.cache.classes import Cache
from libbot.utils import config_get
from pymongo.results import InsertOneResult
from errors import UserNotFoundError from modules.database import col_users, col_warnings
from modules.database import col_users from modules.utils import config_get
logger: Logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)
class HoloUser: class HoloUser:
def __init__( def __init__(
self, self, user: Union[discord.User, discord.Member, discord.member.Member, int]
_id: ObjectId,
id: int,
custom_role: int | None,
custom_channel: int | None,
) -> None: ) -> None:
self._id: ObjectId = _id
self.id: int = id
self.custom_role: int | None = custom_role
self.custom_channel: int | None = custom_channel
@classmethod
async def from_user(
cls,
user: User | Member,
allow_creation: bool = True,
cache: Optional[Cache] = None,
) -> "HoloUser":
"""Get an object that has a proper binding between Discord ID and database """Get an object that has a proper binding between Discord ID and database
### Args: ### Args:
* `user` (User | Member): Object from which an ID can be extracted * `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted
* `allow_creation` (bool, optional): Whether to allow creation of a new user record if none found. Defaults to True.
* `cache` (Cache, optional): Cache engine to get the cache from
### Raises: ### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database * `UserNotFoundError`: User with such ID does not seem to exist in database
""" """
if cache is not None:
cached_entry: Dict[str, Any] | None = cache.get_json(f"user_{user.id}")
if cached_entry is not None: if hasattr(user, "id"):
return cls(**cached_entry) self.id = user.id # type: ignore
else:
self.id = user
db_entry: Dict[str, Any] | None = await col_users.find_one({"id": user.id}) jav_user = col_users.find_one({"user": self.id})
if db_entry is None: if jav_user is None:
if not allow_creation: raise UserNotFoundError(user=user, user_id=self.id)
raise UserNotFoundError(user=user, user_id=user.id)
db_entry = HoloUser.get_defaults(user.id) self.db_id = jav_user["_id"]
insert_result: InsertOneResult = await col_users.insert_one(db_entry) self.customrole = jav_user["customrole"]
self.customchannel = jav_user["customchannel"]
self.warnings = self.warns()
db_entry["_id"] = insert_result.inserted_id def warns(self) -> int:
"""Get number of warnings user has
if cache is not None: ### Returns:
cache.set_json(f"user_{user.id}", db_entry) * `int`: Number of warnings
"""
warns = col_warnings.find_one({"user": self.id})
if warns == None:
return 0
else:
return warns["warns"]
return cls(**db_entry) def warn(self, count=1, reason: str = "Not provided") -> None:
"""Warn and add count to warns number
@classmethod ### Args:
async def from_id(cls, user_id: int) -> "HoloUser": * `count` (int, optional): Count of warnings to be added. Defaults to 1.
raise NotImplementedError() """
warns = col_warnings.find_one({"user": self.id})
if warns != None:
col_warnings.update_one(
filter={"_id": self.db_id},
update={"$set": {"warns": warns["warns"] + count}},
)
else:
col_warnings.insert_one(document={"user": self.id, "warns": count})
logger.info(f"User {self.id} was warned {count} times due to: {reason}")
async def _set(self, key: str, value: Any, cache: Optional[Cache] = None) -> None: def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into the database """Set attribute data and save it into database
### Args: ### Args:
* `key` (str): Attribute to be changed * `key` (str): Attribute to be changed
* `value` (Any): Value to set * `value` (Any): Value to set
* `cache` (Cache, optional): Cache engine to write the update into
""" """
if not hasattr(self, key): if not hasattr(self, key):
raise AttributeError() raise AttributeError()
setattr(self, key, value) setattr(self, key, value)
col_users.update_one(
await col_users.update_one( filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
{"_id": self._id}, {"$set": {key: value}}, upsert=True
) )
logger.info(f"Set attribute {key} of user {self.id} to {value}")
self._update_cache(cache) async def is_moderator(
self, member: Union[discord.User, discord.Member, discord.member.Member]
logger.info("Set attribute '%s' of user %s to '%s'", key, self.id, value) ) -> bool:
async def _remove(self, key: str, cache: Optional[Cache] = None) -> None:
"""Remove attribute data and save it into the database
### Args:
* `key` (str): Attribute to be removed
* `cache` (Cache, optional): Cache engine to write the update into
"""
if not hasattr(self, key):
raise AttributeError()
default_value: Any = HoloUser.get_default_value(key)
setattr(self, key, default_value)
await col_users.update_one(
{"_id": self._id}, {"$set": {key: default_value}}, upsert=True
)
self._update_cache(cache)
logger.info("Removed attribute '%s' of user %s", key, self.id)
def _get_cache_key(self) -> str:
return f"user_{self.id}"
def _update_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
user_dict: Dict[str, Any] = self._to_dict()
if user_dict is not None:
cache.set_json(self._get_cache_key(), user_dict)
else:
self._delete_cache(cache)
def _delete_cache(self, cache: Optional[Cache] = None) -> None:
if cache is None:
return
cache.delete(self._get_cache_key())
@staticmethod
def get_defaults(user_id: int | None = None) -> Dict[str, Any]:
return {
"id": user_id,
"custom_role": None,
"custom_channel": None,
}
@staticmethod
def get_default_value(key: str) -> Any:
if key not in HoloUser.get_defaults():
raise KeyError(f"There's no default value for key '{key}' in HoloUser")
return HoloUser.get_defaults()[key]
def _to_dict(self) -> Dict[str, Any]:
return {
"_id": self._id,
"id": self.id,
"custom_role": self.custom_role,
"custom_channel": self.custom_channel,
}
async def set_custom_channel(
self, channel_id: int, cache: Optional[Cache] = None
) -> None:
await self._set("custom_channel", channel_id, cache=cache)
async def set_custom_role(
self, role_id: int, cache: Optional[Cache] = None
) -> None:
await self._set("custom_role", role_id, cache=cache)
async def remove_custom_channel(self, cache: Optional[Cache] = None) -> None:
await self._remove("custom_channel", cache=cache)
async def remove_custom_role(self, cache: Optional[Cache] = None) -> None:
await self._remove("custom_role", cache=cache)
async def purge(self, cache: Optional[Cache] = None) -> None:
"""Completely remove user data from database. Only removes the user record from users collection.
### Args:
* `cache` (Cache, optional): Cache engine to write the update into
"""
await col_users.delete_one({"_id": self._id})
self._delete_cache(cache)
@staticmethod
async def is_moderator(member: User | Member) -> bool:
"""Check if user is moderator or council member """Check if user is moderator or council member
### Args: ### Args:
* `member` (User | Member): Member object * `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
### Returns: ### Returns:
`bool`: `True` if member is a moderator or member of council and `False` if not `bool`: `True` if member is a moderator or member of council and `False` if not
""" """
if isinstance(member, User): if isinstance(member, discord.User):
return False return False
moderator_role = await config_get("moderators", "roles")
moderator_role: int | None = await config_get("moderators", "roles") council_role = await config_get("council", "roles")
council_role: int | None = await config_get("council", "roles")
for role in member.roles: for role in member.roles:
if role.id in (moderator_role, council_role): if role.id == moderator_role or role.id == council_role:
return True return True
return False return False
@staticmethod async def is_council(
async def is_council(member: User | Member) -> bool: self, member: Union[discord.User, discord.Member, discord.member.Member]
) -> bool:
"""Check if user is a member of council """Check if user is a member of council
### Args: ### Args:
* `member` (User | Member): Member object * `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
### Returns: ### Returns:
`bool`: `True` if member is a member of council and `False` if not `bool`: `True` if member is a member of council and `False` if not
""" """
if isinstance(member, User): if isinstance(member, discord.User):
return False return False
council_role = await config_get("council", "roles") council_role = await config_get("council", "roles")
for role in member.roles: for role in member.roles:
if role.id == council_role: if role.id == council_role:
return True return True
return False return False
# def purge(self) -> None:
# """Completely remove data from database. Will not remove transactions logs and warnings."""
# col_users.delete_one(filter={"_id": self.db_id})
# self.unauthorize()

View File

@@ -1,39 +1,108 @@
import logging import logging
import sys
from logging import Logger
from discord import ( from discord import ApplicationContext, Embed, User, option, slash_command
ApplicationContext,
Embed,
User,
option,
slash_command,
Role,
TextChannel,
)
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot.utils import config_get from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot from enums.colors import Color
from enums import Color from modules.scheduled import scheduler
from modules.scheduler import scheduler from modules.utils import config_get
from modules.utils_sync import guild_name from modules.utils_sync import config_get_sync, guild_name
from modules.waifu_pics import waifu_pics
logger: Logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Admin(commands.Cog): class Admin(commands.Cog):
"""Cog with utility commands for admins.""" def __init__(self, client: PycordBot):
self.client = client
def __init__(self, client: HoloBot): # @slash_command(
self.client: HoloBot = client # name="warning",
# description="Попередити юзера про порушення правил",
# guild_ids=[config_get_sync("guild")],
# )
# @option("user", description="Користувач")
# @option("reason", description="Причина")
# async def warn_cmd(
# self,
# ctx: ApplicationContext,
# user: User,
# reason: str = "Не вказана",
# ):
# logging.info(f"User {ctx.user.id} warned {user.id} for {reason}")
# await ctx.defer()
# jav_user = HoloUser(user)
# if ctx.user.id in await config_get("admins"):
# logging.info(
# f"Moderator {guild_name(ctx.user)} warned {guild_name(user)} for {reason} (has {jav_user.warnings} warns)"
# )
# if jav_user.warnings >= 5:
# logging.info(
# f"User {guild_name(user)} was banned due to a big amount of warns ({jav_user.warnings})"
# )
# await user.send(
# embed=Embed(
# title="Перманентне блокування",
# description=f"Вас було заблоковано за неодноразове порушення правил сервера.",
# color=Color.fail,
# )
# )
# await user.ban(reason=reason)
# elif jav_user.warnings >= 2:
# logging.info(
# f"User {guild_name(user)} was muted due to a big amount of warns ({jav_user.warnings})"
# )
# jav_user.warn(reason=reason)
# await user.send(
# embed=Embed(
# title="Тимчасове блокування",
# description=f"Причина: `{reason}`\n\nНа вашому рахунку вже {jav_user.warnings} попереджень. Вас було тимчасово заблоковано на **1 годину**.\n\nЯкщо Ви продовжите порушувати правила сервера згодом Вас заблокують.",
# color=0xDED56B,
# )
# )
# await user.timeout_for(timedelta(hours=1), reason=reason)
# else:
# jav_user.warn()
# await ctx.respond(
# embed=Embed(
# title="Попередження",
# description=f"{user.mention} Будь ласка, не порушуйте правила. Ви отримали попередження з причини `{reason}`.\n\nЯкщо Ви продовжите порушувати правила це може призвести до блокування в спільноті.",
# color=0xDED56B,
# )
# )
# else:
# logging.warning(
# f"User {guild_name(ctx.user)} tried to use /warn but permission denied"
# )
# await ctx.respond(
# embed=Embed(
# title="Відмовлено в доступі",
# description="Здається, це команда лише для модераторів",
# color=Color.fail,
# )
# )
# mod_role = ds_utils.get(
# ctx.user.guild.roles, id=await config_get("moderators", "roles")
# )
# admin_chan = ds_utils.get(
# ctx.user.guild.channels,
# id=await config_get("adminchat", "channels", "text"),
# )
# await admin_chan.send(
# content=f"{mod_role.mention}",
# embed=Embed(
# title="Неавторизований запит",
# description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
# color=Color.fail,
# ),
# )
@slash_command( @slash_command(
name="clear", name="clear",
description="Видалити деяку кількість повідомлень в каналі", description="Видалити деяку кількість повідомлень в каналі",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option("amount", description="Кількість") @option("amount", description="Кількість")
@option("user", description="Користувач", default=None) @option("user", description="Користувач", default=None)
@@ -42,11 +111,7 @@ class Admin(commands.Cog):
ctx: ApplicationContext, ctx: ApplicationContext,
amount: int, amount: int,
user: User, user: User,
) -> None: ):
"""Command /clear <amount> [<user>]
Removes last <amount> messages in the current channel. Optionally from a specific user.
"""
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
logging.info( logging.info(
"User %s removed %s message(s) in %s", "User %s removed %s message(s) in %s",
@@ -54,111 +119,89 @@ class Admin(commands.Cog):
amount, amount,
ctx.channel.id, ctx.channel.id,
) )
await ctx.respond( await ctx.respond(
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0 embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
) )
if user == None:
if user is None:
await ctx.channel.purge(limit=amount) await ctx.channel.purge(limit=amount)
else: else:
await ctx.channel.purge( await ctx.channel.purge(
limit=amount, check=lambda msg: msg.author == user limit=amount, check=lambda msg: msg.author == user
) )
else:
return logging.warning(
"User %s tried to use /clear but permission denied",
logging.warning( guild_name(ctx.user),
"User %s tried to use /clear but permission denied", )
guild_name(ctx.user), await ctx.respond(
) embed=Embed(
title="Відмовлено в доступі",
await ctx.respond( description="Здається, це команда лише для модераторів",
embed=Embed( color=Color.fail,
title="Відмовлено в доступі", )
description="Здається, це команда лише для модераторів", )
color=Color.FAIL, mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
) )
)
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send( await admin_chan.send(
content="" if mod_role is None else mod_role.mention, content=f"{mod_role.mention}",
embed=Embed( embed=Embed(
title="Неавторизований запит", title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL, color=Color.fail,
), ),
) )
@slash_command( @slash_command(
name="reboot", name="reboot",
description="Перезапустити бота", description="Перезапустити бота",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
async def reboot_cmd(self, ctx: ApplicationContext) -> None: async def reboot_cmd(self, ctx: ApplicationContext):
"""Command /reboot
Stops the bot. Is called "reboot" because it's assumed that the bot has automatic restart.
"""
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user)) logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Вимикаюсь...", title="Вимикаюсь...",
description="Спробую перезавантажитись за 5 секунд", description="Спробую перезавантажитись за 5 секунд",
) )
) )
scheduler.shutdown() scheduler.shutdown()
await self.client.close() await self.client.close()
await waifu_pics._client_session.close() exit()
else:
sys.exit() logging.warning(
"User %s tried to use /reboot but permission denied",
logging.warning( guild_name(ctx.user),
"User %s tried to use /reboot but permission denied", )
guild_name(ctx.user), await ctx.respond(
) embed=Embed(
title="Відмовлено в доступі",
await ctx.respond( description="Здається, це команда лише для модераторів",
embed=Embed( color=Color.fail,
title="Відмовлено в доступі", )
description="Здається, це команда лише для модераторів", )
color=Color.FAIL, mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
) )
)
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send( await admin_chan.send(
content="" if mod_role is None else mod_role.mention, content=f"{mod_role.mention}",
embed=Embed( embed=Embed(
title="Неавторизований запит", title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL, color=Color.fail,
), ),
) )
def setup(client: HoloBot) -> None: def setup(client: PycordBot):
client.add_cog(Admin(client)) client.add_cog(Admin(client))

View File

@@ -1,31 +1,26 @@
import logging import logging
from logging import Logger
from typing import Dict, List, Any
from discord import Cog, Message from discord import Cog, Message
from discord.ext import commands from discord.ext import commands
from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from modules.database import col_analytics from modules.database import col_analytics
logger: Logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Analytics(commands.Cog): class Analytics(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: PycordBot):
self.client: HoloBot = client self.client = client
@Cog.listener() @Cog.listener()
async def on_message(self, message: Message) -> None: async def on_message(self, message: Message):
"""Listener that collects analytical data (stickers, attachments, messages)."""
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot is False) and (message.author.bot == False)
and (message.author.system is False) and (message.author.system == False)
): ):
# Handle stickers stickers = []
stickers: List[Dict[str, Any]] = []
for sticker in message.stickers: for sticker in message.stickers:
stickers.append( stickers.append(
{ {
@@ -36,9 +31,7 @@ class Analytics(commands.Cog):
} }
) )
# Handle attachments attachments = []
attachments: List[Dict[str, Any]] = []
for attachment in message.attachments: for attachment in message.attachments:
attachments.append( attachments.append(
{ {
@@ -53,10 +46,9 @@ class Analytics(commands.Cog):
} }
) )
# Insert entry into the database col_analytics.insert_one(
await col_analytics.insert_one(
{ {
"user_id": message.author.id, "user": message.author.id,
"channel": message.channel.id, "channel": message.channel.id,
"content": message.content, "content": message.content,
"stickers": stickers, "stickers": stickers,
@@ -65,5 +57,5 @@ class Analytics(commands.Cog):
) )
def setup(client: HoloBot) -> None: def setup(client: PycordBot):
client.add_cog(Analytics(client)) client.add_cog(Analytics(client))

View File

@@ -1,164 +1,116 @@
import logging from discord import ApplicationContext, Embed, option
from logging import Logger
from typing import Any, Dict
from discord import ApplicationContext, Embed, option, TextChannel, Role
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.abc import GuildChannel from discord.abc import GuildChannel
from discord.commands import SlashCommandGroup from discord.commands import SlashCommandGroup
from discord.ext import commands from discord.ext import commands
from libbot.utils import config_get from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser from classes.holo_user import HoloUser
from enums import Color from enums.colors import Color
from modules.database import col_users from modules.database import col_users
from modules.utils_sync import guild_name from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
logger: Logger = logging.getLogger(__name__)
class CustomChannels(commands.Cog): class CustomChannels(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: PycordBot):
self.client: HoloBot = client self.client = client
@commands.Cog.listener() @commands.Cog.listener()
async def on_guild_channel_delete(self, channel: GuildChannel) -> None: async def on_guild_channel_delete(self, channel: GuildChannel):
await col_users.find_one_and_update( col_users.find_one_and_update(
{"custom_channel": channel.id}, {"$set": {"custom_channel": None}} {"customchannel": channel.id}, {"$set": {"customchannel": None}}
) )
custom_channel_group: SlashCommandGroup = SlashCommandGroup( customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом")
"customchannel", "Керування особистим каналом"
)
@custom_channel_group.command( @customchannel.command(
name="get", name="get",
description="Отримати персональний текстовий канал", description="Отримати персональний текстовий канал",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option("name", description="Назва каналу") @option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції") @option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки") @option("threads", description="Дозволити гілки")
async def custom_channel_get_cmd( async def customchannel_get_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
) -> None: ):
"""Command /customchannel get <name> <reactions> <threads> holo_user_ctx = HoloUser(ctx.user)
Command to create a custom channel for a user. if holo_user_ctx.customchannel == None:
""" await ctx.defer()
holo_user_ctx: HoloUser = await HoloUser.from_user( created_channel = await ctx.user.guild.create_text_channel(
ctx.user, cache=self.client.cache name=name,
) reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get(
# Return if the user is using the command outside of a guild ctx.author.guild.categories,
if not hasattr(ctx.author, "guild"): id=await config_get("customchannels", "categories"),
await ctx.defer(ephemeral=True) ),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
add_reactions=reactions,
create_public_threads=threads,
create_private_threads=threads,
)
await created_channel.set_permissions(
ctx.user,
attach_files=True,
manage_messages=True,
send_messages=True,
embed_links=True,
manage_channels=True,
)
holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Помилка виконання", title="Створено канал",
description="Виконання за межами сервера не є можливим.", description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.FAIL, color=Color.success,
) )
) )
return bots = await config_get("bots")
for bot in bots:
# Return if the user already has a custom channel await created_channel.set_permissions(
if holo_user_ctx.custom_channel is not None: ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
else:
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Помилка виконання", title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.", description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.FAIL, color=Color.fail,
) )
) )
return
await ctx.defer() @customchannel.command(
created_channel: TextChannel = await ctx.user.guild.create_text_channel(
name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get(
ctx.author.guild.categories,
id=await config_get("custom_channels", "categories"),
),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
add_reactions=reactions,
create_public_threads=threads,
create_private_threads=threads,
)
await created_channel.set_permissions(
ctx.user,
attach_files=True,
manage_messages=True,
send_messages=True,
embed_links=True,
manage_channels=True,
)
await holo_user_ctx.set_custom_channel(
created_channel.id, cache=self.client.cache
)
await ctx.respond(
embed=Embed(
title="Створено канал",
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.SUCCESS,
)
)
bots: Dict[str, Any] = await config_get("bots")
for bot in bots:
role: Role | None = ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"])
if role is not None:
await created_channel.set_permissions(
role,
view_channel=False,
)
@custom_channel_group.command(
name="edit", name="edit",
description="Змінити параметри особистого каналу", description="Змінити параметри особистого каналу",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option("name", description="Назва каналу") @option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції") @option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки") @option("threads", description="Дозволити гілки")
async def custom_channel_edit_cmd( async def customchannel_edit_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
) -> None: ):
"""Command /customchannel edit <name> <reactions> <threads> holo_user_ctx = HoloUser(ctx.user)
Command to change properties of a custom channel. custom_channel = ds_utils.get(
""" ctx.guild.channels, id=holo_user_ctx.customchannel
holo_user_ctx: HoloUser = await HoloUser.from_user(
ctx.user, cache=self.client.cache
) )
custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.custom_channel
)
# Return if the channel was not found
if custom_channel is None: if custom_channel is None:
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Канал не знайдено", title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.FAIL, color=Color.fail,
) )
) )
return return
await custom_channel.edit(name=name) await custom_channel.edit(name=name)
await custom_channel.set_permissions( await custom_channel.set_permissions(
ctx.user.guild.default_role, ctx.user.guild.default_role,
@@ -167,89 +119,68 @@ class CustomChannels(commands.Cog):
create_public_threads=threads, create_public_threads=threads,
create_private_threads=threads, create_private_threads=threads,
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Канал змінено", title="Канал змінено",
description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`", description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`",
color=Color.FAIL, color=Color.fail,
) )
) )
@custom_channel_group.command( @customchannel.command(
name="remove", name="remove",
description="Відібрати канал, знищуючи його, та частково повернути кошти", description="Відібрати канал, знищуючи його, та частково повернути кошти",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option("confirm", description="Підтвердження операції") @option("confirm", description="Підтвердження операції")
async def custom_channel_remove_cmd( async def customchannel_remove_cmd(
self, ctx: ApplicationContext, confirm: bool = False self, ctx: ApplicationContext, confirm: bool = False
) -> None: ):
"""Command /customchannel remove [<confirm>] holo_user_ctx = HoloUser(ctx.user)
Command to remove a custom channel. Requires additional confirmation.""" if holo_user_ctx.customchannel is not None:
holo_user_ctx: HoloUser = await HoloUser.from_user( await ctx.defer()
ctx.user, cache=self.client.cache custom_channel = ds_utils.get(
) ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the user does not have a custom channel if custom_channel is None:
if holo_user_ctx.custom_channel is None: await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail,
)
)
holo_user_ctx.set("customchannel", None)
return
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.fail,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.default,
)
)
else:
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Помилка виконання", title="Помилка виконання",
description="У вас немає особистого каналу.", description="У вас немає особистого каналу.",
color=Color.FAIL, color=Color.fail,
) )
) )
return
await ctx.defer()
custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.custom_channel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.FAIL,
)
)
await holo_user_ctx.remove_custom_channel(cache=self.client.cache)
return
# Return if the confirmation is missing
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.FAIL,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
await holo_user_ctx.remove_custom_channel(cache=self.client.cache)
try:
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.DEFAULT,
)
)
except Exception as exc:
logger.warning(
"Could not send a custom channel removal confirmation due to: %s", exc
)
def setup(client: HoloBot) -> None: def setup(client: PycordBot):
client.add_cog(CustomChannels(client)) client.add_cog(CustomChannels(client))

View File

@@ -1,180 +1,167 @@
import logging import logging
from logging import Logger
from os import makedirs from os import makedirs
from pathlib import Path from pathlib import Path
from typing import List, Dict, Any
from uuid import uuid4 from uuid import uuid4
from discord import ApplicationContext, Embed, File, option, Role, TextChannel from discord import ApplicationContext, Embed, File, option
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.commands import SlashCommandGroup from discord.commands import SlashCommandGroup
from discord.ext import commands from discord.ext import commands
from libbot.utils import config_get, json_write from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser from classes.holo_user import HoloUser
from enums import Color from enums.colors import Color
from modules.utils_sync import guild_name from modules.database import col_users
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
logger: Logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Data(commands.Cog): class Data(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: PycordBot):
self.client: HoloBot = client self.client = client
data: SlashCommandGroup = SlashCommandGroup("data", "Керування даними користувачів") data = SlashCommandGroup("data", "Керування даними користувачів")
@data.command( @data.command(
name="export", name="export",
description="Експортувати дані", description="Експортувати дані",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option( @option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None: async def data_export_cmd(self, ctx: ApplicationContext, kind: str):
"""Command /data export <kind>
Command to export specific kind of data."""
await ctx.defer() await ctx.defer()
holo_user = HoloUser(ctx.author)
# Return if the user is not an owner and not in the council if (ctx.user.id in self.client.owner_ids) or (
if (ctx.user.id not in self.client.owner_ids) and not ( await holo_user.is_council(ctx.author)
await HoloUser.is_council(ctx.author)
): ):
logging.info(
"Moderator %s exported current users list", guild_name(ctx.user)
)
makedirs("tmp", exist_ok=True)
uuid = str(uuid4())
if kind == "Користувачі":
users = []
for member in ctx.guild.members:
users.append(
{
"id": member.id,
"nick": member.nick,
"username": f"{member.name}#{member.discriminator}",
"bot": member.bot,
}
)
json_write_sync(users, str(Path(f"tmp/{uuid}")))
await ctx.respond(
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
)
else:
logging.info( logging.info(
"User %s tried to use /export but permission denied", "User %s tried to use /export but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Відмовлено в доступі", title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів", description="Здається, це команда лише для модераторів",
color=Color.FAIL, color=Color.fail,
) )
) )
mod_role = ds_utils.get(
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan: TextChannel | None = ds_utils.get( admin_chan = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
await admin_chan.send( await admin_chan.send(
content="" if mod_role is None else mod_role.mention, content=f"{mod_role.mention}",
embed=Embed( embed=Embed(
title="Неавторизований запит", title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL, color=Color.fail,
), ),
) )
return
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
makedirs("tmp", exist_ok=True)
uuid: str = str(uuid4())
if kind == "Користувачі":
users: List[Dict[str, Any]] = []
for member in ctx.guild.members:
users.append(
{
"id": member.id,
"nick": member.nick,
"username": member.name,
"bot": member.bot,
}
)
# Temporary file must be written synchronously,
# otherwise it will not be there when ctx.respond() is be called
# TODO Find a way to give this file to Pycord without FS operations
json_write(users, Path(f"tmp/{uuid}"))
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
# TODO Deprecate this command
@data.command( @data.command(
name="migrate", name="migrate",
description="Мігрувати всіх користувачів до бази", description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option( @option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None: async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
"""Command /migrate <kind>
Command to migrate specific kind of data.
Migration of users in this case means creation of their DB entries."""
await ctx.defer() await ctx.defer()
holo_user = HoloUser(ctx.author)
# Return if the user is not an owner and not in the council if (ctx.user.id in self.client.owner_ids) or (
if (ctx.user.id not in self.client.owner_ids) and not ( await holo_user.is_council(ctx.author)
await HoloUser.is_council(ctx.author)
): ):
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
)
if kind == "Користувачі":
for member in ctx.guild.members:
if member.bot:
continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
await ctx.respond(
embed=Embed(
title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.success,
)
)
else:
logging.info( logging.info(
"User %s tried to use /migrate but permission denied", "User %s tried to use /migrate but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Відмовлено в доступі", title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів", description="Здається, це команда лише для модераторів",
color=Color.FAIL, color=Color.fail,
) )
) )
mod_role = ds_utils.get(
mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan: TextChannel | None = ds_utils.get( admin_chan = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
await admin_chan.send(
if admin_chan is not None: content=f"{mod_role.mention}",
await admin_chan.send( embed=Embed(
content="" if mod_role is None else mod_role.mention, title="Неавторизований запит",
embed=Embed( description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
title="Неавторизований запит", color=Color.fail,
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", ),
color=Color.FAIL,
),
)
return
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
)
if kind == "Користувачі":
for member in ctx.guild.members:
if member.bot:
continue
await HoloUser.from_user(member, cache=self.client.cache)
await ctx.respond(
embed=Embed(
title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.SUCCESS,
) )
)
def setup(client: HoloBot) -> None: def setup(client: PycordBot):
client.add_cog(Data(client)) client.add_cog(Data(client))

View File

@@ -1,42 +1,40 @@
import logging import logging
from logging import Logger
from discord import ApplicationContext, Embed, User, option, slash_command from discord import ApplicationContext, Embed, User, option, slash_command
from discord.ext import commands from discord.ext import commands
from libbot.utils import config_get from libbot.pycord.classes import PycordBot
from WaifuPicsPython import WaifuAsync
from classes.holo_bot import HoloBot from modules.utils import config_get
from modules.utils_sync import guild_name from modules.utils_sync import config_get_sync, guild_name
from modules.waifu_pics import waifu_pics
logger: Logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
wafiu_pics = WaifuAsync()
class Fun(commands.Cog): class Fun(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: PycordBot):
self.client: HoloBot = client self.client = client
@slash_command( @slash_command(
name="action", name="action",
description="Провести над користувачем РП дію", description="Провести над користувачем РП дію",
guild_ids=[config_get("guild")], guild_ids=[config_get_sync("guild")],
) )
@option( @option(
"type", "type",
description="Тип дії, яку хочете провести з користувачем", description="Тип дії, яку хочете провести з користувачем",
choices=config_get("actions").keys(), choices=config_get_sync("actions").keys(),
) )
@option("user", description="Користувач") @option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None: async def action_cmd(self, ctx: ApplicationContext, type: str, user: User):
"""Command /action <type> <user>
Command to perform some RP action on a user and send them a GIF."""
await ctx.defer() await ctx.defer()
action: str = await config_get("category", "actions", type) action = await config_get("category", "actions", type)
action_verb: str = await config_get("action", "actions", type) action_verb = await config_get("action", "actions", type)
image_url: str = await waifu_pics.sfw(action) image = await wafiu_pics.sfw(action)
logger.info( logger.info(
"User %s (%s) %s %s (%s) with image %s", "User %s (%s) %s %s (%s) with image %s",
@@ -45,17 +43,17 @@ class Fun(commands.Cog):
action_verb, action_verb,
guild_name(user), guild_name(user),
user.id, user.id,
image_url, image,
) )
embed: Embed = Embed( embed = Embed(
description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**", description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**",
color=0x2F3136, color=0x2F3136,
) )
embed.set_image(url=image_url) embed.set_image(url=image)
await ctx.respond(embed=embed) await ctx.respond(embed=embed)
def setup(client: HoloBot) -> None: def setup(client: PycordBot):
client.add_cog(Fun(client)) client.add_cog(Fun(client))

View File

@@ -1,81 +1,67 @@
import logging from discord import Member, Message
from logging import Logger
from discord import Member, Message, TextChannel, MessageType
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot.utils import config_get from libbot.pycord.classes import PycordBot
from classes.holo_bot import HoloBot
from classes.holo_user import HoloUser
from modules.database import col_users from modules.database import col_users
from modules.utils import config_get
logger: Logger = logging.getLogger(__name__)
class Logger(commands.Cog): class Logger(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: PycordBot):
self.client: HoloBot = client self.client = client
@commands.Cog.listener() @commands.Cog.listener()
async def on_message(self, message: Message): async def on_message(self, message: Message):
"""Message listener. All actions on messages remain here for now."""
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot is False) and (message.author.bot == False)
and (message.author.system is False) and (message.author.system == False)
): ):
await HoloUser.from_user(message.author, cache=self.client.cache) if col_users.find_one({"user": message.author.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
if ( user["user"] = message.author.id
(message.type == MessageType.thread_created)
and (message.channel is not None) for key in defaults:
and ( user[key] = defaults[key]
await col_users.count_documents({"custom_channel": message.channel.id})
> 0 col_users.insert_one(document=user)
)
):
try:
logger.info(
"Deleting the thread creation message in a custom channel %s",
message.channel.id,
)
await message.delete()
except Exception as exc:
logger.warning(
"Could not delete the thread creation message in a custom channel %s due to %s",
message.channel.id,
exc,
)
@commands.Cog.listener() @commands.Cog.listener()
async def on_member_join(self, member: Member) -> None: async def on_member_join(self, member: Member):
"""Member join handler. All actions on member join remain here for now.""" welcome_chan = ds_utils.get(
welcome_chan: TextChannel | None = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels, self.client.get_guild(await config_get("guild")).channels,
id=await config_get("welcome", "channels", "text"), id=await config_get("welcome", "channels", "text"),
) )
rules_chan: TextChannel | None = ds_utils.get( rules_chan = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels, self.client.get_guild(await config_get("guild")).channels,
id=await config_get("rules", "channels", "text"), id=await config_get("rules", "channels", "text"),
) )
if ( if (
(member != self.client.user) (member != self.client.user)
and (member.bot is False) and (member.bot == False)
and (member.system is False) and (member.system == False)
): ):
if welcome_chan is not None and rules_chan is not None: await welcome_chan.send(
await welcome_chan.send( content=(await config_get("welcome", "messages")).format(
content=(await config_get("welcome", "messages")).format( mention=member.mention, rules=rules_chan.mention
mention=member.mention, rules=rules_chan.mention
)
) )
else: )
logger.warning("Could not find a welcome and/or rules channel by id")
await HoloUser.from_user(member, cache=self.client.cache) if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
def setup(client: HoloBot) -> None: def setup(client: PycordBot):
client.add_cog(Logger(client)) client.add_cog(Logger(client))

View File

@@ -1,58 +0,0 @@
import logging
from logging import Logger
from discord import Activity, ActivityType
from discord.ext import commands
from libbot.utils import config_get
from classes.holo_bot import HoloBot
logger: Logger = logging.getLogger(__name__)
class Utility(commands.Cog):
def __init__(self, client: HoloBot):
self.client: HoloBot = client
@commands.Cog.listener()
async def on_ready(self) -> None:
"""Listener for the event when bot connects to Discord and becomes "ready"."""
logger.info("Logged in as %s", self.client.user)
activity_type: str = await config_get("type", "status")
activity_message: str = await config_get("message", "status")
if activity_type == "playing":
await self.client.change_presence(
activity=Activity(type=ActivityType.playing, name=activity_message)
)
elif activity_type == "watching":
await self.client.change_presence(
activity=Activity(type=ActivityType.watching, name=activity_message)
)
elif activity_type == "listening":
await self.client.change_presence(
activity=Activity(type=ActivityType.listening, name=activity_message)
)
elif activity_type == "streaming":
await self.client.change_presence(
activity=Activity(type=ActivityType.streaming, name=activity_message)
)
elif activity_type == "competing":
await self.client.change_presence(
activity=Activity(type=ActivityType.competing, name=activity_message)
)
elif activity_type == "custom":
await self.client.change_presence(
activity=Activity(type=ActivityType.custom, name=activity_message)
)
else:
return
logger.info(
"Set activity type to %s with message %s", activity_type, activity_message
)
def setup(client: HoloBot) -> None:
client.add_cog(Utility(client))

View File

@@ -22,24 +22,18 @@
"port": 27017, "port": 27017,
"name": "holo_discord" "name": "holo_discord"
}, },
"cache": {
"type": null,
"memcached": {
"uri": "127.0.0.1:11211",
"prefix": null
},
"redis": {
"uri": "redis://127.0.0.1:6379/0",
"prefix": null
}
},
"logging": { "logging": {
"size": 512, "size": 512,
"location": "logs" "location": "logs"
}, },
"defaults": {}, "defaults": {
"user": {
"customrole": null,
"customchannel": null
}
},
"categories": { "categories": {
"custom_channels": 0 "customchannels": 0
}, },
"channels": { "channels": {
"text": { "text": {

View File

@@ -1 +0,0 @@
from .colors import Color

View File

@@ -1,7 +1,6 @@
from enum import IntEnum from enum import IntEnum
class Color(IntEnum): class Color(IntEnum):
FAIL = 0xD6345B fail = 0xd6345b
SUCCESS = 0x84D961 success = 0x84d961
DEFAULT = 0xA7A6AB default = 0xa7a6ab

View File

@@ -1 +0,0 @@
from .user import UserNotFoundError

View File

@@ -1,10 +0,0 @@
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)

101
main.py
View File

@@ -1,71 +1,76 @@
import contextlib
import logging import logging
import sys
from argparse import ArgumentParser
from logging import Logger
from pathlib import Path
from discord import LoginFailure, Intents from discord import Activity, ActivityType
from libbot.utils import config_get
from classes.holo_bot import HoloBot from modules.client import client
from modules.migrator import migrate_database from modules.scheduled import scheduler
from modules.scheduler import scheduler from modules.utils import config_get
from modules.utils_sync import config_get_sync
if not Path("config.json").exists():
print(
"Config file is missing: Make sure the configuration file 'config.json' is in place.",
flush=True,
)
sys.exit()
logging.basicConfig( logging.basicConfig(
level=logging.INFO if not config_get("debug") else logging.DEBUG, level=logging.INFO,
format="%(name)s.%(funcName)s | %(levelname)s | %(message)s", format="%(name)s.%(funcName)s | %(levelname)s | %(message)s",
datefmt="[%X]", datefmt="[%X]",
) )
logger: Logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Declare the parser that retrieves the command line arguments try:
parser = ArgumentParser( import uvloop # type: ignore
prog="HoloUA Discord",
description="Discord bot for the HoloUA community.",
)
# Add a switch argument --migrate to be parsed...
parser.add_argument("--migrate", action="store_true")
# ...and parse the arguments we added
args = parser.parse_args()
# Try to import the module that improves performance
# and ignore errors when module is not installed
with contextlib.suppress(ImportError):
import uvloop
uvloop.install() uvloop.install()
except ImportError:
pass
def main() -> None: @client.event
# Perform migration if command line argument was provided async def on_ready():
if args.migrate: logger.info("Logged in as %s", client.user)
logger.info("Performing migrations...")
migrate_database()
intents: Intents = Intents().all() activity_type = await config_get("type", "status")
client: HoloBot = HoloBot(intents=intents, scheduler=scheduler) activity_message = await config_get("message", "status")
if activity_type == "playing":
await client.change_presence(
activity=Activity(type=ActivityType.playing, name=activity_message)
)
elif activity_type == "watching":
await client.change_presence(
activity=Activity(type=ActivityType.watching, name=activity_message)
)
elif activity_type == "listening":
await client.change_presence(
activity=Activity(type=ActivityType.listening, name=activity_message)
)
elif activity_type == "streaming":
await client.change_presence(
activity=Activity(type=ActivityType.streaming, name=activity_message)
)
elif activity_type == "competing":
await client.change_presence(
activity=Activity(type=ActivityType.competing, name=activity_message)
)
elif activity_type == "custom":
await client.change_presence(
activity=Activity(type=ActivityType.custom, name=activity_message)
)
else:
return
logger.info(
"Set activity type to %s with message %s", activity_type, activity_message
)
def main():
client.load_extension("cogs") client.load_extension("cogs")
try: try:
client.run(config_get("bot_token", "bot")) scheduler.start()
except LoginFailure as exc: client.run(config_get_sync("bot_token", "bot"))
logger.error("Provided bot token is invalid: %s", exc)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("KeyboardInterrupt received: Shutting down gracefully.") scheduler.shutdown()
finally: exit()
sys.exit()
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -1,79 +0,0 @@
import logging
from logging import Logger
from libbot.utils import config_get, config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
# Categories
config_set(
"custom_channels",
config_get("customchannels", "categories"),
"categories",
)
config_delete("customchannels", "categories")
# User defaults
config_delete(
"user",
"defaults",
)
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"customchannel": {"$exists": True}},
{"$rename": {"customchannel": "custom_channel"}},
)
self.db.users.update_many(
{"customrole": {"$exists": True}},
{"$rename": {"customrole": "custom_role"}},
)
def downgrade(self):
try:
# Categories
config_set(
"customchannels",
config_get("custom_channels", "categories"),
"categories",
)
config_delete("custom_channels", "categories")
# User defaults
config_set(
"customrole",
None,
"defaults",
"user",
)
config_set(
"customchannel",
None,
"defaults",
"user",
)
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"custom_channel": {"$exists": True}},
{"$rename": {"custom_channel": "customchannel"}},
)
self.db.users.update_many(
{"custom_role": {"$exists": True}},
{"$rename": {"custom_role": "customrole"}},
)

View File

@@ -1,63 +0,0 @@
import logging
from logging import Logger
from libbot.utils import config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
config_set(
"cache",
{
"type": None,
"memcached": {"uri": "127.0.0.1:11211"},
"redis": {"uri": "redis://127.0.0.1:6379/0"},
},
*[],
)
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "id"}},
)
self.db.analytics.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "user_id"}},
)
self.db.warnings.update_many(
{"user": {"$exists": True}},
{"$rename": {"user": "user_id"}},
)
def downgrade(self):
try:
config_delete("cache", *[])
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
self.db.users.update_many(
{"id": {"$exists": True}},
{"$rename": {"id": "user"}},
)
self.db.analytics.update_many(
{"user": {"$exists": True}},
{"$rename": {"user_id": "user"}},
)
self.db.warnings.update_many(
{"user": {"$exists": True}},
{"$rename": {"user_id": "user"}},
)

View File

@@ -1,31 +0,0 @@
import logging
from logging import Logger
from libbot.utils import config_set, config_delete
from mongodb_migrations.base import BaseMigration
logger: Logger = logging.getLogger(__name__)
class Migration(BaseMigration):
def upgrade(self):
try:
config_set("prefix", None, "cache", "memcached")
config_set("prefix", None, "cache", "redis")
except Exception as exc:
logger.error(
"Could not upgrade the config during migration '%s' due to: %s",
__name__,
exc,
)
def downgrade(self):
try:
config_delete("prefix", "cache", "redis")
config_delete("prefix", "cache", "memcached")
except Exception as exc:
logger.error(
"Could not downgrade the config during migration '%s' due to: %s",
__name__,
exc,
)

6
modules/client.py Normal file
View File

@@ -0,0 +1,6 @@
from discord import Intents
from libbot.pycord.classes import PycordBot
intents = Intents().all()
intents.members = True
client = PycordBot(intents=intents)

View File

@@ -1,16 +1,12 @@
from typing import Dict, Any from pymongo import MongoClient
from ujson import loads
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase with open("config.json", "r", encoding="utf-8") as f:
from libbot.utils import config_get db_config = loads(f.read())["database"]
f.close()
db_config: Dict[str, Any] = config_get("database") db_client = MongoClient(
"mongodb://{0}:{1}@{2}:{3}/{4}".format(
con_string: str = (
"mongodb://{0}:{1}/{2}".format(
db_config["host"], db_config["port"], db_config["name"]
)
if db_config["user"] is None or db_config["password"] is None
else "mongodb://{0}:{1}@{2}:{3}/{4}".format(
db_config["user"], db_config["user"],
db_config["password"], db_config["password"],
db_config["host"], db_config["host"],
@@ -18,14 +14,14 @@ con_string: str = (
db_config["name"], db_config["name"],
) )
) )
db = db_client.get_database(name=db_config["name"])
db_client: AsyncClient = AsyncClient(con_string) collections = db.list_collection_names()
# Async declarations per default for collection in ["users", "warnings", "scheduler", "analytics"]:
db: AsyncDatabase = db_client.get_database(name=db_config["name"]) if not collection in collections:
db.create_collection(collection)
col_users: AsyncCollection = db.get_collection("users") col_users = db.get_collection("users")
col_analytics: AsyncCollection = db.get_collection("analytics") col_warnings = db.get_collection("warnings")
col_analytics = db.get_collection("analytics")
# Update indexes
db.dispatch.get_collection("users").create_index("id", unique=True)

View File

@@ -1,22 +0,0 @@
from typing import Any, Mapping
from libbot.utils import config_get
from mongodb_migrations.cli import MigrationManager
from mongodb_migrations.config import Configuration
def migrate_database() -> None:
"""Apply migrations from folder `migrations/` to the database"""
database_config: Mapping[str, Any] = config_get("database")
manager_config = Configuration(
{
"mongo_host": database_config["host"],
"mongo_port": database_config["port"],
"mongo_database": database_config["name"],
"mongo_username": database_config["user"],
"mongo_password": database_config["password"],
}
)
manager = MigrationManager(manager_config)
manager.run()

View File

@@ -1,3 +1,3 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler: AsyncIOScheduler = AsyncIOScheduler() scheduler = AsyncIOScheduler()

38
modules/utils.py Normal file
View File

@@ -0,0 +1,38 @@
from typing import Any
import aiofiles
from ujson import dumps, loads
async def json_read(path: str) -> Any:
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: str) -> None:
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
)
async def config_get(key: str, *path: str) -> Any:
this_key = await json_read("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(key: str, value: Any, *path: str) -> None:
this_dict = await json_read("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
await json_write(this_dict, "config.json")
return

View File

@@ -1,8 +1,45 @@
from typing import Any, Union
from discord import Member, User from discord import Member, User
from ujson import dumps, loads
def guild_name(member: Member | User) -> str: def json_read_sync(path: str) -> Any:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Union[Member, User]) -> str:
if isinstance(member, User): if isinstance(member, User):
return member.name return member.name
if member.nick == None:
return member.name if member.nick is None else member.nick return member.name
else:
return member.nick

View File

@@ -1,3 +0,0 @@
from WaifuPicsPython import WaifuAsync
waifu_pics: WaifuAsync = WaifuAsync()

View File

@@ -1,13 +1,8 @@
# Waifu pics related dependencies (not listed directly by waifupics) aiofiles==23.2.1
aiohttp>=3.10.0 apscheduler==3.10.4
requests>=2.32.2 pymongo~=4.7.3
requests~=2.32.3
aiofiles~=24.1.0 ujson~=5.10.0
apscheduler>=3.10.0 WaifuPicsPython==0.2.0
async_pymongo==0.1.11 --extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
libbot[speed,pycord,cache]==4.4.0 libbot[speed,pycord]==3.2.2
mongodb-migrations==1.3.1
pymemcache~=4.0.0
redis~=6.4.0
ujson~=5.11.0
WaifuPicsPython==0.2.0

View File

@@ -1,14 +1,14 @@
{ {
"$jsonSchema": { "$jsonSchema": {
"required": [ "required": [
"user_id", "user",
"channel", "channel",
"content", "content",
"stickers", "stickers",
"attachments" "attachments"
], ],
"properties": { "properties": {
"user_id": { "user": {
"bsonType": "long", "bsonType": "long",
"description": "Discord ID of user" "description": "Discord ID of user"
}, },
@@ -17,10 +17,7 @@
"description": "Discord ID of a channel" "description": "Discord ID of a channel"
}, },
"content": { "content": {
"bsonType": [ "bsonType": ["null", "string"],
"null",
"string"
],
"description": "Text of the message" "description": "Text of the message"
}, },
"stickers": { "stickers": {
@@ -43,7 +40,7 @@
"format": { "format": {
"bsonType": "array" "bsonType": "array"
}, },
"user_id": { "user": {
"bsonType": "string" "bsonType": "string"
} }
} }

View File

@@ -1,27 +1,22 @@
{ {
"$jsonSchema": { "$jsonSchema": {
"required": [ "required": [
"id", "user",
"custom_role", "customrole",
"custom_channel" "customchannel"
], ],
"properties": { "properties": {
"id": { "user": {
"bsonType": "long", "bsonType": "long",
"description": "Discord ID of user" "description": "Discord ID of user"
}, },
"custom_role": { "customrole": {
"bsonType": [ "bsonType": ["null", "long"],
"null",
"long"
],
"description": "Discord ID of custom role or 'null' if not set" "description": "Discord ID of custom role or 'null' if not set"
}, },
"custom_channel": { "customchannel": {
"bsonType": [ "bsonType": ["null", "long"],
"null",
"long"
],
"description": "Discord ID of custom channel or 'null' if not set" "description": "Discord ID of custom channel or 'null' if not set"
} }
} }

18
validation/warnings.json Normal file
View File

@@ -0,0 +1,18 @@
{
"$jsonSchema": {
"required": [
"user",
"warns"
],
"properties": {
"user": {
"bsonType": "long",
"description": "Discord ID of user"
},
"warns": {
"bsonType": "int",
"description": "Number of warnings on count"
}
}
}
}