Merge pull request 'v0.1.0-rc.3' (#15) from dev into main

Reviewed-on: #15
This commit is contained in:
Profitroll 2024-12-16 22:08:22 +02:00
commit 3010dc02bc
20 changed files with 562 additions and 465 deletions

View File

@ -1,2 +1,38 @@
# HoloBotDiscord
<h1 align="center">HoloBot Discord</h1>
<p align="center">Small Discord bot made on Py-Cord</p>
<p align="center">
<a href="https://git.end-play.xyz/HoloUA/Discord/src/branch/master/LICENSE"><img alt="License: GPL" src="https://img.shields.io/badge/License-GPL-blue"></a>
<a href="https://git.end-play.xyz/HoloUA/Discord"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
</p>
## Installation
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation/).
2. `git clone https://git.end-play.xyz/HoloUA/Discord.git`
3. `cd Discord`
4. Install Python 3.9+ (at least 3.11 is recommended) for your OS
5. `python3 -m pip install -r requirements.txt`
6. Run it with `python3 main.py` after configuring
## Configuration
There's a file `config_example.json` which contains default configuration
and should be used as a base config.
Copy this file to `config.json` and open it with any text editor of your liking.
Modify the newly created configuration file to fit your needs.
Mandatory keys to modify:
- guild
- bot.owner
- bot.bot_token
- database.*
- categories.*
- channels.*
- roles.*
After all of that you're good to go! Happy using :)

View File

@ -1,60 +1,41 @@
import logging
from typing import Any, Union
from typing import Any, Union, Dict
import discord
import discord.member
from bson import ObjectId
from discord import User, Member
from libbot import config_get
from modules.database import col_users, col_warnings
from modules.utils import config_get
from errors import UserNotFoundError
from modules.database import col_warnings, sync_col_users, sync_col_warnings, col_users
logger = logging.getLogger(__name__)
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)
class HoloUser:
def __init__(
self, user: Union[discord.User, discord.Member, discord.member.Member, int]
) -> None:
def __init__(self, user: Union[User, Member, int]) -> None:
"""Get an object that has a proper binding between Discord ID and database
### Args:
* `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted
* `user` (Union[User, Member, int]): Object from which ID can be extracted
### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database
"""
if hasattr(user, "id"):
self.id = user.id # type: ignore
else:
self.id = user
self.id: int = user if not hasattr(user, "id") else user.id
jav_user = col_users.find_one({"user": self.id})
jav_user: Union[Dict[str, Any], None] = sync_col_users.find_one(
{"user": self.id}
)
if jav_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
self.db_id = jav_user["_id"]
self.db_id: ObjectId = jav_user["_id"]
self.customrole = jav_user["customrole"]
self.customchannel = jav_user["customchannel"]
self.warnings = self.warns()
self.customrole: Union[int, None] = jav_user["customrole"]
self.customchannel: Union[int, None] = jav_user["customchannel"]
self.warnings: int = self.warns()
def warns(self) -> int:
"""Get number of warnings user has
@ -62,29 +43,33 @@ class HoloUser:
### Returns:
* `int`: Number of warnings
"""
warns = col_warnings.find_one({"user": self.id})
if warns == None:
return 0
else:
return warns["warns"]
warns: Union[Dict[str, Any], None] = sync_col_warnings.find_one(
{"user": self.id}
)
def warn(self, count=1, reason: str = "Not provided") -> None:
return 0 if warns is None else warns["warns"]
async def warn(self, count=1, reason: str = "Not provided") -> None:
"""Warn and add count to warns number
### Args:
* `count` (int, optional): Count of warnings to be added. Defaults to 1.
"""
warns = col_warnings.find_one({"user": self.id})
if warns != None:
col_warnings.update_one(
filter={"_id": self.db_id},
update={"$set": {"warns": warns["warns"] + count}},
warns: Union[Dict[str, Any], None] = await col_warnings.find_one(
{"user": self.id}
)
if warns is not None:
await col_warnings.update_one(
{"_id": self.db_id},
{"$set": {"warns": warns["warns"] + count}},
)
else:
col_warnings.insert_one(document={"user": self.id, "warns": count})
logger.info(f"User {self.id} was warned {count} times due to: {reason}")
await col_warnings.insert_one(document={"user": self.id, "warns": count})
def set(self, key: str, value: Any) -> None:
logger.info("User %s was warned %s times due to: %s", self.id, count, reason)
async def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into database
### Args:
@ -93,49 +78,56 @@ class HoloUser:
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
col_users.update_one(
filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
)
logger.info(f"Set attribute {key} of user {self.id} to {value}")
async def is_moderator(
self, member: Union[discord.User, discord.Member, discord.member.Member]
) -> bool:
setattr(self, key, value)
await col_users.update_one(
{"_id": self.db_id}, {"$set": {key: value}}, upsert=True
)
logger.info("Set attribute %s of user %s to %s", key, self.id, value)
@staticmethod
async def is_moderator(member: Union[User, Member]) -> bool:
"""Check if user is moderator or council member
### Args:
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
* `member` (Union[User, Member]): Member object
### Returns:
`bool`: `True` if member is a moderator or member of council and `False` if not
"""
if isinstance(member, discord.User):
return False
moderator_role = await config_get("moderators", "roles")
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == moderator_role or role.id == council_role:
return True
if isinstance(member, User):
return False
async def is_council(
self, member: Union[discord.User, discord.Member, discord.member.Member]
) -> bool:
moderator_role: Union[int, None] = await config_get("moderators", "roles")
council_role: Union[int, None] = await config_get("council", "roles")
for role in member.roles:
if role.id in (moderator_role, council_role):
return True
return False
@staticmethod
async def is_council(member: Union[User, Member]) -> bool:
"""Check if user is a member of council
### Args:
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
* `member` (Union[User, Member]): Member object
### Returns:
`bool`: `True` if member is a member of council and `False` if not
"""
if isinstance(member, discord.User):
if isinstance(member, User):
return False
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == council_role:
return True
return False
# def purge(self) -> None:

View File

@ -1,22 +1,37 @@
import logging
import sys
from typing import Union
from discord import ApplicationContext, Embed, User, option, slash_command
from discord import (
ApplicationContext,
Embed,
User,
option,
slash_command,
Role,
TextChannel,
)
from discord import utils as ds_utils
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from enums.colors import Color
from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
from enums import Color
from modules.scheduler import scheduler
from modules.utils_sync import guild_name
from modules.waifu_pics import waifu_pics
logger = logging.getLogger(__name__)
class Admin(commands.Cog):
def __init__(self, client: PycordBot):
self.client = client
"""Cog with utility commands for admins."""
def __init__(self, client: PycordBot):
self.client: PycordBot = client
# Disabled because warning functionality is temporarily not needed
# @slash_command(
# name="warning",
# description="Попередити юзера про порушення правил",
@ -102,7 +117,7 @@ class Admin(commands.Cog):
@slash_command(
name="clear",
description="Видалити деяку кількість повідомлень в каналі",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("amount", description="Кількість")
@option("user", description="Користувач", default=None)
@ -111,7 +126,7 @@ class Admin(commands.Cog):
ctx: ApplicationContext,
amount: int,
user: User,
):
) -> None:
if ctx.user.id in self.client.owner_ids:
logging.info(
"User %s removed %s message(s) in %s",
@ -119,89 +134,107 @@ class Admin(commands.Cog):
amount,
ctx.channel.id,
)
await ctx.respond(
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
)
if user == None:
if user is None:
await ctx.channel.purge(limit=amount)
else:
await ctx.channel.purge(
limit=amount, check=lambda msg: msg.author == user
)
else:
return
logging.warning(
"User %s tried to use /clear but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
color=Color.FAIL,
)
)
mod_role = ds_utils.get(
mod_role: Union[Role, None] = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
admin_chan: Union[TextChannel, None] = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content=f"{mod_role.mention}",
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
color=Color.FAIL,
),
)
@slash_command(
name="reboot",
description="Перезапустити бота",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
async def reboot_cmd(self, ctx: ApplicationContext):
async def reboot_cmd(self, ctx: ApplicationContext) -> None:
await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids:
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
await ctx.respond(
embed=Embed(
title="Вимикаюсь...",
description="Спробую перезавантажитись за 5 секунд",
)
)
scheduler.shutdown()
await self.client.close()
exit()
else:
await waifu_pics._client_session.close()
sys.exit()
logging.warning(
"User %s tried to use /reboot but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
color=Color.FAIL,
)
)
mod_role = ds_utils.get(
mod_role: Union[Role, None] = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
admin_chan: Union[TextChannel, None] = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content=f"{mod_role.mention}",
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
color=Color.FAIL,
),
)
def setup(client: PycordBot):
def setup(client: PycordBot) -> None:
client.add_cog(Admin(client))

View File

@ -1,4 +1,5 @@
import logging
from typing import Dict, List, Any
from discord import Cog, Message
from discord.ext import commands
@ -11,16 +12,17 @@ logger = logging.getLogger(__name__)
class Analytics(commands.Cog):
def __init__(self, client: PycordBot):
self.client = client
self.client: PycordBot = client
@Cog.listener()
async def on_message(self, message: Message):
async def on_message(self, message: Message) -> None:
if (
(message.author != self.client.user)
and (message.author.bot == False)
and (message.author.system == False)
and (message.author.bot is False)
and (message.author.system is False)
):
stickers = []
stickers: List[Dict[str, Any]] = []
for sticker in message.stickers:
stickers.append(
{
@ -31,7 +33,8 @@ class Analytics(commands.Cog):
}
)
attachments = []
attachments: List[Dict[str, Any]] = []
for attachment in message.attachments:
attachments.append(
{
@ -46,7 +49,7 @@ class Analytics(commands.Cog):
}
)
col_analytics.insert_one(
await col_analytics.insert_one(
{
"user": message.author.id,
"channel": message.channel.id,
@ -57,5 +60,5 @@ class Analytics(commands.Cog):
)
def setup(client: PycordBot):
def setup(client: PycordBot) -> None:
client.add_cog(Analytics(client))

View File

@ -1,45 +1,77 @@
from discord import ApplicationContext, Embed, option
import logging
from typing import Any, Dict, Union
from discord import ApplicationContext, Embed, option, TextChannel, Role
from discord import utils as ds_utils
from discord.abc import GuildChannel
from discord.commands import SlashCommandGroup
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from classes.holo_user import HoloUser
from enums.colors import Color
from enums import Color
from modules.database import col_users
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
from modules.utils_sync import guild_name
logger = logging.getLogger(__name__)
class CustomChannels(commands.Cog):
def __init__(self, client: PycordBot):
self.client = client
self.client: PycordBot = client
@commands.Cog.listener()
async def on_guild_channel_delete(self, channel: GuildChannel):
col_users.find_one_and_update(
async def on_guild_channel_delete(self, channel: GuildChannel) -> None:
await col_users.find_one_and_update(
{"customchannel": channel.id}, {"$set": {"customchannel": None}}
)
customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом")
custom_channel_group: SlashCommandGroup = SlashCommandGroup(
"customchannel", "Керування особистим каналом"
)
@customchannel.command(
@custom_channel_group.command(
name="get",
description="Отримати персональний текстовий канал",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки")
async def customchannel_get_cmd(
async def custom_channel_get_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
):
holo_user_ctx = HoloUser(ctx.user)
) -> None:
holo_user_ctx: HoloUser = HoloUser(ctx.user)
# Return if the user is using the command outside of a guild
if not hasattr(ctx.author, "guild"):
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="Виконання за межами сервера не є можливим.",
color=Color.FAIL,
)
)
return
# Return if the user already has a custom channel
if holo_user_ctx.customchannel is not None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.FAIL,
)
)
return
if holo_user_ctx.customchannel == None:
await ctx.defer()
created_channel = await ctx.user.guild.create_text_channel(
created_channel: TextChannel = await ctx.user.guild.create_text_channel(
name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get(
@ -47,6 +79,7 @@ class CustomChannels(commands.Cog):
id=await config_get("customchannels", "categories"),
),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
@ -62,55 +95,58 @@ class CustomChannels(commands.Cog):
embed_links=True,
manage_channels=True,
)
holo_user_ctx.set("customchannel", created_channel.id)
await holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond(
embed=Embed(
title="Створено канал",
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.success,
)
)
bots = await config_get("bots")
for bot in bots:
await created_channel.set_permissions(
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
else:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.fail,
color=Color.SUCCESS,
)
)
@customchannel.command(
bots: Dict[str, Any] = await config_get("bots")
for bot in bots:
role: Union[Role, None] = ds_utils.get(
ctx.user.guild.roles, id=bots[bot]["role"]
)
if role is not None:
await created_channel.set_permissions(
role,
view_channel=False,
)
@custom_channel_group.command(
name="edit",
description="Змінити параметри особистого каналу",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки")
async def customchannel_edit_cmd(
async def custom_channel_edit_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
):
holo_user_ctx = HoloUser(ctx.user)
) -> None:
holo_user_ctx: HoloUser = HoloUser(ctx.user)
custom_channel = ds_utils.get(
custom_channel: Union[TextChannel, None] = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail,
color=Color.FAIL,
)
)
return
await custom_channel.edit(name=name)
await custom_channel.set_permissions(
ctx.user.guild.default_role,
@ -119,68 +155,84 @@ class CustomChannels(commands.Cog):
create_public_threads=threads,
create_private_threads=threads,
)
await ctx.respond(
embed=Embed(
title="Канал змінено",
description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`",
color=Color.fail,
color=Color.FAIL,
)
)
@customchannel.command(
@custom_channel_group.command(
name="remove",
description="Відібрати канал, знищуючи його, та частково повернути кошти",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("confirm", description="Підтвердження операції")
async def customchannel_remove_cmd(
async def custom_channel_remove_cmd(
self, ctx: ApplicationContext, confirm: bool = False
):
holo_user_ctx = HoloUser(ctx.user)
) -> None:
holo_user_ctx: HoloUser = HoloUser(ctx.user)
if holo_user_ctx.customchannel is not None:
await ctx.defer()
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail,
)
)
holo_user_ctx.set("customchannel", None)
return
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.fail,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.default,
)
)
else:
# Return if the user does not have a custom channel
if holo_user_ctx.customchannel is None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас немає особистого каналу.",
color=Color.fail,
color=Color.FAIL,
)
)
return
await ctx.defer()
custom_channel: Union[TextChannel, None] = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.FAIL,
)
)
await holo_user_ctx.set("customchannel", None)
return
# Return if the confirmation is missing
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.FAIL,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
await holo_user_ctx.set("customchannel", None)
try:
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.DEFAULT,
)
)
except Exception as exc:
logger.warning(
"Could not send a custom channel removal confirmation due to: %s", exc
)
def setup(client: PycordBot):
def setup(client: PycordBot) -> None:
client.add_cog(CustomChannels(client))

View File

@ -1,51 +1,87 @@
import logging
from os import makedirs
from pathlib import Path
from typing import Union, List, Dict, Any
from uuid import uuid4
from discord import ApplicationContext, Embed, File, option
from discord import ApplicationContext, Embed, File, option, Role, TextChannel
from discord import utils as ds_utils
from discord.commands import SlashCommandGroup
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from libbot.sync import json_write as sync_json_write
from classes.holo_user import HoloUser
from enums.colors import Color
from enums import Color
from modules.database import col_users
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
from modules.utils_sync import guild_name
logger = logging.getLogger(__name__)
class Data(commands.Cog):
def __init__(self, client: PycordBot):
self.client = client
self.client: PycordBot = client
data = SlashCommandGroup("data", "Керування даними користувачів")
data: SlashCommandGroup = SlashCommandGroup("data", "Керування даними користувачів")
@data.command(
name="export",
description="Експортувати дані",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_export_cmd(self, ctx: ApplicationContext, kind: str):
async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None:
await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or (
await holo_user.is_council(ctx.author)
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await HoloUser.is_council(ctx.author)
):
logging.info(
"Moderator %s exported current users list", guild_name(ctx.user)
"User %s tried to use /export but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
)
)
mod_role: Union[Role, None] = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: Union[TextChannel, None] = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
),
)
return
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
makedirs("tmp", exist_ok=True)
uuid = str(uuid4())
uuid: str = str(uuid4())
if kind == "Користувачі":
users = []
users: List[Dict[str, Any]] = []
for member in ctx.guild.members:
users.append(
@ -57,53 +93,58 @@ class Data(commands.Cog):
}
)
json_write_sync(users, str(Path(f"tmp/{uuid}")))
sync_json_write(users, Path(f"tmp/{uuid}"))
await ctx.respond(
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
)
else:
logging.info(
"User %s tried to use /export but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
@data.command(
name="migrate",
description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None:
await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or (
await holo_user.is_council(ctx.author)
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await HoloUser.is_council(ctx.author)
):
logging.info(
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.FAIL,
)
)
mod_role: Union[Role, None] = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan: Union[TextChannel, None] = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
if admin_chan is not None:
await admin_chan.send(
content="" if mod_role is None else mod_role.mention,
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.FAIL,
),
)
return
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
@ -113,16 +154,18 @@ class Data(commands.Cog):
for member in ctx.guild.members:
if member.bot:
continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
if (await col_users.find_one({"user": member.id})) is None:
user: Dict[str, Any] = {}
defaults: Dict[str, Any] = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
await col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
@ -131,37 +174,10 @@ class Data(commands.Cog):
embed=Embed(
title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.success,
color=Color.SUCCESS,
)
)
else:
logging.info(
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
def setup(client: PycordBot):
def setup(client: PycordBot) -> None:
client.add_cog(Data(client))

View File

@ -2,39 +2,38 @@ import logging
from discord import ApplicationContext, Embed, User, option, slash_command
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from WaifuPicsPython import WaifuAsync
from libbot.sync import config_get as sync_config_get
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
from modules.utils_sync import guild_name
from modules.waifu_pics import waifu_pics
logger = logging.getLogger(__name__)
wafiu_pics = WaifuAsync()
class Fun(commands.Cog):
def __init__(self, client: PycordBot):
self.client = client
self.client: PycordBot = client
@slash_command(
name="action",
description="Провести над користувачем РП дію",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option(
"type",
description="Тип дії, яку хочете провести з користувачем",
choices=config_get_sync("actions").keys(),
choices=sync_config_get("actions").keys(),
)
@option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User):
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None:
await ctx.defer()
action = await config_get("category", "actions", type)
action_verb = await config_get("action", "actions", type)
action: str = await config_get("category", "actions", type)
action_verb: str = await config_get("action", "actions", type)
image = await wafiu_pics.sfw(action)
image_url: str = await waifu_pics.sfw(action)
logger.info(
"User %s (%s) %s %s (%s) with image %s",
@ -43,17 +42,17 @@ class Fun(commands.Cog):
action_verb,
guild_name(user),
user.id,
image,
image_url,
)
embed = Embed(
embed: Embed = Embed(
description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**",
color=0x2F3136,
)
embed.set_image(url=image)
embed.set_image(url=image_url)
await ctx.respond(embed=embed)
def setup(client: PycordBot):
def setup(client: PycordBot) -> None:
client.add_cog(Fun(client))

View File

@ -1,49 +1,51 @@
from discord import Member, Message
from typing import Dict, Any, Union
from discord import Member, Message, TextChannel
from discord import utils as ds_utils
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from modules.database import col_users
from modules.utils import config_get
class Logger(commands.Cog):
def __init__(self, client: PycordBot):
self.client = client
self.client: PycordBot = client
@commands.Cog.listener()
async def on_message(self, message: Message):
if (
(message.author != self.client.user)
and (message.author.bot == False)
and (message.author.system == False)
and (message.author.bot is False)
and (message.author.system is False)
):
if col_users.find_one({"user": message.author.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
if (await col_users.find_one({"user": message.author.id})) is None:
user: Dict[str, Any] = {}
defaults: Dict[str, Any] = await config_get("user", "defaults")
user["user"] = message.author.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
await col_users.insert_one(document=user)
@commands.Cog.listener()
async def on_member_join(self, member: Member):
welcome_chan = ds_utils.get(
async def on_member_join(self, member: Member) -> None:
welcome_chan: Union[TextChannel, None] = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels,
id=await config_get("welcome", "channels", "text"),
)
rules_chan = ds_utils.get(
rules_chan: Union[TextChannel, None] = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels,
id=await config_get("rules", "channels", "text"),
)
if (
(member != self.client.user)
and (member.bot == False)
and (member.system == False)
and (member.bot is False)
and (member.system is False)
):
await welcome_chan.send(
content=(await config_get("welcome", "messages")).format(
@ -51,17 +53,17 @@ class Logger(commands.Cog):
)
)
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
if (await col_users.find_one({"user": member.id})) is None:
user: Dict[str, Any] = {}
defaults: Dict[str, Any] = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
await col_users.insert_one(document=user)
def setup(client: PycordBot):
def setup(client: PycordBot) -> None:
client.add_cog(Logger(client))

1
enums/__init__.py Normal file
View File

@ -0,0 +1 @@
from .colors import Color

View File

@ -1,6 +1,7 @@
from enum import IntEnum
class Color(IntEnum):
fail = 0xd6345b
success = 0x84d961
default = 0xa7a6ab
FAIL = 0xD6345B
SUCCESS = 0x84D961
DEFAULT = 0xA7A6AB

1
errors/__init__.py Normal file
View File

@ -0,0 +1 @@
from .user import UserNotFoundError

10
errors/user.py Normal file
View File

@ -0,0 +1,10 @@
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)

22
main.py
View File

@ -1,11 +1,13 @@
import logging
import sys
from logging import Logger
from discord import Activity, ActivityType
from libbot import config_get
from libbot.sync import config_get as sync_config_get
from modules.client import client
from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync
from modules.scheduler import scheduler
logging.basicConfig(
level=logging.INFO,
@ -13,7 +15,7 @@ logging.basicConfig(
datefmt="[%X]",
)
logger = logging.getLogger(__name__)
logger: Logger = logging.getLogger(__name__)
try:
import uvloop # type: ignore
@ -24,11 +26,11 @@ except ImportError:
@client.event
async def on_ready():
async def on_ready() -> None:
logger.info("Logged in as %s", client.user)
activity_type = await config_get("type", "status")
activity_message = await config_get("message", "status")
activity_type: str = await config_get("type", "status")
activity_message: str = await config_get("message", "status")
if activity_type == "playing":
await client.change_presence(
@ -62,15 +64,15 @@ async def on_ready():
)
def main():
def main() -> None:
client.load_extension("cogs")
try:
scheduler.start()
client.run(config_get_sync("bot_token", "bot"))
client.run(sync_config_get("bot_token", "bot"))
except KeyboardInterrupt:
scheduler.shutdown()
exit()
sys.exit()
if __name__ == "__main__":

View File

@ -1,6 +1,10 @@
from discord import Intents
from libbot.pycord.classes import PycordBot
intents = Intents().all()
from modules.scheduler import scheduler
intents: Intents = Intents().all()
intents.members = True
client = PycordBot(intents=intents)
client: PycordBot = PycordBot(intents=intents, scheduler=scheduler)

View File

@ -1,12 +1,19 @@
from typing import Dict, Any
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase
from libbot.sync import config_get as sync_config_get
from pymongo import MongoClient
from ujson import loads
from pymongo.synchronous.collection import Collection
from pymongo.synchronous.database import Database
with open("config.json", "r", encoding="utf-8") as f:
db_config = loads(f.read())["database"]
f.close()
db_config: Dict[str, Any] = sync_config_get("database")
db_client = MongoClient(
"mongodb://{0}:{1}@{2}:{3}/{4}".format(
con_string: str = (
"mongodb://{0}:{1}/{2}".format(
db_config["host"], db_config["port"], db_config["name"]
)
if db_config["user"] is None or db_config["password"] is None
else "mongodb://{0}:{1}@{2}:{3}/{4}".format(
db_config["user"],
db_config["password"],
db_config["host"],
@ -14,14 +21,20 @@ db_client = MongoClient(
db_config["name"],
)
)
db = db_client.get_database(name=db_config["name"])
collections = db.list_collection_names()
db_client: AsyncClient = AsyncClient(con_string)
db_client_sync: MongoClient = MongoClient(con_string)
for collection in ["users", "warnings", "scheduler", "analytics"]:
if not collection in collections:
db.create_collection(collection)
# Async declarations per default
db: AsyncDatabase = db_client.get_database(name=db_config["name"])
col_users = db.get_collection("users")
col_warnings = db.get_collection("warnings")
col_analytics = db.get_collection("analytics")
col_users: AsyncCollection = db.get_collection("users")
col_warnings: AsyncCollection = db.get_collection("warnings")
col_analytics: AsyncCollection = db.get_collection("analytics")
# Sync declarations as a fallback
sync_db: Database = db_client_sync.get_database(name=db_config["name"])
sync_col_users: Collection = sync_db.get_collection("users")
sync_col_warnings: Collection = sync_db.get_collection("warnings")
sync_col_analytics: Collection = sync_db.get_collection("analytics")

View File

@ -1,3 +1,3 @@
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
scheduler: AsyncIOScheduler = AsyncIOScheduler()

View File

@ -1,38 +0,0 @@
from typing import Any
import aiofiles
from ujson import dumps, loads
async def json_read(path: str) -> Any:
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: str) -> None:
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
)
async def config_get(key: str, *path: str) -> Any:
this_key = await json_read("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(key: str, value: Any, *path: str) -> None:
this_dict = await json_read("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
await json_write(this_dict, "config.json")
return

View File

@ -1,45 +1,10 @@
from typing import Any, Union
from typing import Union
from discord import Member, User
from ujson import dumps, loads
def json_read_sync(path: str) -> Any:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Union[Member, User]) -> str:
if isinstance(member, User):
return member.name
if member.nick == None:
return member.name
else:
return member.nick
return member.name if member.nick is None else member.nick

3
modules/waifu_pics.py Normal file
View File

@ -0,0 +1,3 @@
from WaifuPicsPython import WaifuAsync
waifu_pics: WaifuAsync = WaifuAsync()

View File

@ -1,8 +1,10 @@
aiofiles==23.2.1
apscheduler==3.10.4
pymongo~=4.7.3
requests~=2.32.3
# Waifu pics related dependencies (not listed directly by waifupics)
aiohttp>=3.10.0
requests>=2.32.2
aiofiles~=24.1.0
apscheduler>=3.10.0
async_pymongo==0.1.11
libbot[speed,pycord]==3.2.3
ujson~=5.10.0
WaifuPicsPython==0.2.0
--extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
libbot[speed,pycord]==3.2.2