diff --git a/README.md b/README.md index e04b533..4a269e2 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,38 @@ -# HoloBotDiscord +

HoloBot Discord

+

Small Discord bot made on Py-Cord

+ +

+License: GPL +Code style: black +

+ +## Installation + +1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation/). +2. `git clone https://git.end-play.xyz/HoloUA/Discord.git` +3. `cd Discord` +4. Install Python 3.9+ (at least 3.11 is recommended) for your OS +5. `python3 -m pip install -r requirements.txt` +6. Run it with `python3 main.py` after configuring + +## Configuration + +There's a file `config_example.json` which contains default configuration +and should be used as a base config. + +Copy this file to `config.json` and open it with any text editor of your liking. + +Modify the newly created configuration file to fit your needs. + +Mandatory keys to modify: + +- guild +- bot.owner +- bot.bot_token +- database.* +- categories.* +- channels.* +- roles.* + +After all of that you're good to go! Happy using :) \ No newline at end of file diff --git a/classes/holo_user.py b/classes/holo_user.py index 1d57392..3c72daf 100644 --- a/classes/holo_user.py +++ b/classes/holo_user.py @@ -1,60 +1,41 @@ import logging -from typing import Any, Union +from typing import Any, Union, Dict -import discord -import discord.member +from bson import ObjectId +from discord import User, Member +from libbot import config_get -from modules.database import col_users, col_warnings -from modules.utils import config_get +from errors import UserNotFoundError +from modules.database import col_warnings, sync_col_users, sync_col_warnings, col_users logger = logging.getLogger(__name__) -class NotEnoughMoneyError(Exception): - """User does not have enough money to do that""" - - pass - - -class UserNotFoundError(Exception): - """HoloUser could not find user with such an ID in database""" - - def __init__(self, user, user_id): - self.user = user - self.user_id = user_id - super().__init__( - f"User of type {type(self.user)} with id {self.user_id} was not found" - ) - - class HoloUser: - def __init__( - self, user: Union[discord.User, discord.Member, discord.member.Member, int] - ) -> None: + def __init__(self, user: Union[User, Member, int]) -> None: """Get an object that has a proper binding between Discord ID and database ### Args: - * `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted + * `user` (Union[User, Member, int]): Object from which ID can be extracted ### Raises: * `UserNotFoundError`: User with such ID does not seem to exist in database """ - if hasattr(user, "id"): - self.id = user.id # type: ignore - else: - self.id = user + self.id: int = user if not hasattr(user, "id") else user.id - jav_user = col_users.find_one({"user": self.id}) + jav_user: Union[Dict[str, Any], None] = sync_col_users.find_one( + {"user": self.id} + ) if jav_user is None: raise UserNotFoundError(user=user, user_id=self.id) - self.db_id = jav_user["_id"] + self.db_id: ObjectId = jav_user["_id"] - self.customrole = jav_user["customrole"] - self.customchannel = jav_user["customchannel"] - self.warnings = self.warns() + self.customrole: Union[int, None] = jav_user["customrole"] + self.customchannel: Union[int, None] = jav_user["customchannel"] + self.warnings: int = self.warns() def warns(self) -> int: """Get number of warnings user has @@ -62,29 +43,33 @@ class HoloUser: ### Returns: * `int`: Number of warnings """ - warns = col_warnings.find_one({"user": self.id}) - if warns == None: - return 0 - else: - return warns["warns"] + warns: Union[Dict[str, Any], None] = sync_col_warnings.find_one( + {"user": self.id} + ) - def warn(self, count=1, reason: str = "Not provided") -> None: + return 0 if warns is None else warns["warns"] + + async def warn(self, count=1, reason: str = "Not provided") -> None: """Warn and add count to warns number ### Args: * `count` (int, optional): Count of warnings to be added. Defaults to 1. """ - warns = col_warnings.find_one({"user": self.id}) - if warns != None: - col_warnings.update_one( - filter={"_id": self.db_id}, - update={"$set": {"warns": warns["warns"] + count}}, + warns: Union[Dict[str, Any], None] = await col_warnings.find_one( + {"user": self.id} + ) + + if warns is not None: + await col_warnings.update_one( + {"_id": self.db_id}, + {"$set": {"warns": warns["warns"] + count}}, ) else: - col_warnings.insert_one(document={"user": self.id, "warns": count}) - logger.info(f"User {self.id} was warned {count} times due to: {reason}") + await col_warnings.insert_one(document={"user": self.id, "warns": count}) - def set(self, key: str, value: Any) -> None: + logger.info("User %s was warned %s times due to: %s", self.id, count, reason) + + async def set(self, key: str, value: Any) -> None: """Set attribute data and save it into database ### Args: @@ -93,49 +78,56 @@ class HoloUser: """ if not hasattr(self, key): raise AttributeError() - setattr(self, key, value) - col_users.update_one( - filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True - ) - logger.info(f"Set attribute {key} of user {self.id} to {value}") - async def is_moderator( - self, member: Union[discord.User, discord.Member, discord.member.Member] - ) -> bool: + setattr(self, key, value) + + await col_users.update_one( + {"_id": self.db_id}, {"$set": {key: value}}, upsert=True + ) + + logger.info("Set attribute %s of user %s to %s", key, self.id, value) + + @staticmethod + async def is_moderator(member: Union[User, Member]) -> bool: """Check if user is moderator or council member ### Args: - * `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object + * `member` (Union[User, Member]): Member object ### Returns: `bool`: `True` if member is a moderator or member of council and `False` if not """ - if isinstance(member, discord.User): + if isinstance(member, User): return False - moderator_role = await config_get("moderators", "roles") - council_role = await config_get("council", "roles") + + moderator_role: Union[int, None] = await config_get("moderators", "roles") + council_role: Union[int, None] = await config_get("council", "roles") + for role in member.roles: - if role.id == moderator_role or role.id == council_role: + if role.id in (moderator_role, council_role): return True + return False - async def is_council( - self, member: Union[discord.User, discord.Member, discord.member.Member] - ) -> bool: + @staticmethod + async def is_council(member: Union[User, Member]) -> bool: """Check if user is a member of council ### Args: - * `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object + * `member` (Union[User, Member]): Member object ### Returns: `bool`: `True` if member is a member of council and `False` if not """ - if isinstance(member, discord.User): + if isinstance(member, User): return False + council_role = await config_get("council", "roles") + for role in member.roles: if role.id == council_role: return True + return False # def purge(self) -> None: diff --git a/cogs/admin.py b/cogs/admin.py index 76da7ad..a8adbdf 100644 --- a/cogs/admin.py +++ b/cogs/admin.py @@ -1,22 +1,37 @@ import logging +import sys +from typing import Union -from discord import ApplicationContext, Embed, User, option, slash_command +from discord import ( + ApplicationContext, + Embed, + User, + option, + slash_command, + Role, + TextChannel, +) from discord import utils as ds_utils from discord.ext import commands +from libbot import config_get from libbot.pycord.classes import PycordBot +from libbot.sync import config_get as sync_config_get -from enums.colors import Color -from modules.scheduled import scheduler -from modules.utils import config_get -from modules.utils_sync import config_get_sync, guild_name +from enums import Color +from modules.scheduler import scheduler +from modules.utils_sync import guild_name +from modules.waifu_pics import waifu_pics logger = logging.getLogger(__name__) class Admin(commands.Cog): - def __init__(self, client: PycordBot): - self.client = client + """Cog with utility commands for admins.""" + def __init__(self, client: PycordBot): + self.client: PycordBot = client + + # Disabled because warning functionality is temporarily not needed # @slash_command( # name="warning", # description="Попередити юзера про порушення правил", @@ -102,7 +117,7 @@ class Admin(commands.Cog): @slash_command( name="clear", description="Видалити деяку кількість повідомлень в каналі", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option("amount", description="Кількість") @option("user", description="Користувач", default=None) @@ -111,7 +126,7 @@ class Admin(commands.Cog): ctx: ApplicationContext, amount: int, user: User, - ): + ) -> None: if ctx.user.id in self.client.owner_ids: logging.info( "User %s removed %s message(s) in %s", @@ -119,89 +134,107 @@ class Admin(commands.Cog): amount, ctx.channel.id, ) + await ctx.respond( embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0 ) - if user == None: + + if user is None: await ctx.channel.purge(limit=amount) else: await ctx.channel.purge( limit=amount, check=lambda msg: msg.author == user ) - else: - logging.warning( - "User %s tried to use /clear but permission denied", - guild_name(ctx.user), - ) - await ctx.respond( - embed=Embed( - title="Відмовлено в доступі", - description="Здається, це команда лише для модераторів", - color=Color.fail, - ) - ) - mod_role = ds_utils.get( - ctx.user.guild.roles, id=await config_get("moderators", "roles") - ) - admin_chan = ds_utils.get( - ctx.user.guild.channels, - id=await config_get("adminchat", "channels", "text"), + + return + + logging.warning( + "User %s tried to use /clear but permission denied", + guild_name(ctx.user), + ) + + await ctx.respond( + embed=Embed( + title="Відмовлено в доступі", + description="Здається, це команда лише для модераторів", + color=Color.FAIL, ) + ) + + mod_role: Union[Role, None] = ds_utils.get( + ctx.user.guild.roles, id=await config_get("moderators", "roles") + ) + admin_chan: Union[TextChannel, None] = ds_utils.get( + ctx.user.guild.channels, + id=await config_get("adminchat", "channels", "text"), + ) + + if admin_chan is not None: await admin_chan.send( - content=f"{mod_role.mention}", + content="" if mod_role is None else mod_role.mention, embed=Embed( title="Неавторизований запит", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", - color=Color.fail, + color=Color.FAIL, ), ) @slash_command( name="reboot", description="Перезапустити бота", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) - async def reboot_cmd(self, ctx: ApplicationContext): + async def reboot_cmd(self, ctx: ApplicationContext) -> None: await ctx.defer(ephemeral=True) + if ctx.user.id in self.client.owner_ids: logging.info("Calling shutdown initiated by %s", guild_name(ctx.user)) + await ctx.respond( embed=Embed( title="Вимикаюсь...", description="Спробую перезавантажитись за 5 секунд", ) ) + scheduler.shutdown() + await self.client.close() - exit() - else: - logging.warning( - "User %s tried to use /reboot but permission denied", - guild_name(ctx.user), - ) - await ctx.respond( - embed=Embed( - title="Відмовлено в доступі", - description="Здається, це команда лише для модераторів", - color=Color.fail, - ) - ) - mod_role = ds_utils.get( - ctx.user.guild.roles, id=await config_get("moderators", "roles") - ) - admin_chan = ds_utils.get( - ctx.user.guild.channels, - id=await config_get("adminchat", "channels", "text"), + await waifu_pics._client_session.close() + + sys.exit() + + logging.warning( + "User %s tried to use /reboot but permission denied", + guild_name(ctx.user), + ) + + await ctx.respond( + embed=Embed( + title="Відмовлено в доступі", + description="Здається, це команда лише для модераторів", + color=Color.FAIL, ) + ) + + mod_role: Union[Role, None] = ds_utils.get( + ctx.user.guild.roles, id=await config_get("moderators", "roles") + ) + admin_chan: Union[TextChannel, None] = ds_utils.get( + ctx.user.guild.channels, + id=await config_get("adminchat", "channels", "text"), + ) + + if admin_chan is not None: await admin_chan.send( - content=f"{mod_role.mention}", + content="" if mod_role is None else mod_role.mention, embed=Embed( title="Неавторизований запит", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", - color=Color.fail, + color=Color.FAIL, ), ) -def setup(client: PycordBot): +def setup(client: PycordBot) -> None: client.add_cog(Admin(client)) diff --git a/cogs/analytics.py b/cogs/analytics.py index e08d204..1849e5d 100644 --- a/cogs/analytics.py +++ b/cogs/analytics.py @@ -1,4 +1,5 @@ import logging +from typing import Dict, List, Any from discord import Cog, Message from discord.ext import commands @@ -11,16 +12,17 @@ logger = logging.getLogger(__name__) class Analytics(commands.Cog): def __init__(self, client: PycordBot): - self.client = client + self.client: PycordBot = client @Cog.listener() - async def on_message(self, message: Message): + async def on_message(self, message: Message) -> None: if ( (message.author != self.client.user) - and (message.author.bot == False) - and (message.author.system == False) + and (message.author.bot is False) + and (message.author.system is False) ): - stickers = [] + stickers: List[Dict[str, Any]] = [] + for sticker in message.stickers: stickers.append( { @@ -31,7 +33,8 @@ class Analytics(commands.Cog): } ) - attachments = [] + attachments: List[Dict[str, Any]] = [] + for attachment in message.attachments: attachments.append( { @@ -46,7 +49,7 @@ class Analytics(commands.Cog): } ) - col_analytics.insert_one( + await col_analytics.insert_one( { "user": message.author.id, "channel": message.channel.id, @@ -57,5 +60,5 @@ class Analytics(commands.Cog): ) -def setup(client: PycordBot): +def setup(client: PycordBot) -> None: client.add_cog(Analytics(client)) diff --git a/cogs/custom_channels.py b/cogs/custom_channels.py index 14c04a7..e55053a 100644 --- a/cogs/custom_channels.py +++ b/cogs/custom_channels.py @@ -1,116 +1,152 @@ -from discord import ApplicationContext, Embed, option +import logging +from typing import Any, Dict, Union + +from discord import ApplicationContext, Embed, option, TextChannel, Role from discord import utils as ds_utils from discord.abc import GuildChannel from discord.commands import SlashCommandGroup from discord.ext import commands +from libbot import config_get from libbot.pycord.classes import PycordBot +from libbot.sync import config_get as sync_config_get from classes.holo_user import HoloUser -from enums.colors import Color +from enums import Color from modules.database import col_users -from modules.utils import config_get -from modules.utils_sync import config_get_sync, guild_name +from modules.utils_sync import guild_name + +logger = logging.getLogger(__name__) class CustomChannels(commands.Cog): def __init__(self, client: PycordBot): - self.client = client + self.client: PycordBot = client @commands.Cog.listener() - async def on_guild_channel_delete(self, channel: GuildChannel): - col_users.find_one_and_update( + async def on_guild_channel_delete(self, channel: GuildChannel) -> None: + await col_users.find_one_and_update( {"customchannel": channel.id}, {"$set": {"customchannel": None}} ) - customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом") + custom_channel_group: SlashCommandGroup = SlashCommandGroup( + "customchannel", "Керування особистим каналом" + ) - @customchannel.command( + @custom_channel_group.command( name="get", description="Отримати персональний текстовий канал", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option("name", description="Назва каналу") @option("reactions", description="Дозволити реакції") @option("threads", description="Дозволити гілки") - async def customchannel_get_cmd( + async def custom_channel_get_cmd( self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool - ): - holo_user_ctx = HoloUser(ctx.user) + ) -> None: + holo_user_ctx: HoloUser = HoloUser(ctx.user) - if holo_user_ctx.customchannel == None: - await ctx.defer() - created_channel = await ctx.user.guild.create_text_channel( - name=name, - reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал", - category=ds_utils.get( - ctx.author.guild.categories, - id=await config_get("customchannels", "categories"), - ), - ) - await created_channel.set_permissions( - ctx.user.guild.default_role, - send_messages=False, - add_reactions=reactions, - create_public_threads=threads, - create_private_threads=threads, - ) - await created_channel.set_permissions( - ctx.user, - attach_files=True, - manage_messages=True, - send_messages=True, - embed_links=True, - manage_channels=True, - ) - holo_user_ctx.set("customchannel", created_channel.id) + # Return if the user is using the command outside of a guild + if not hasattr(ctx.author, "guild"): + await ctx.defer(ephemeral=True) await ctx.respond( embed=Embed( - title="Створено канал", - description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`", - color=Color.success, + title="Помилка виконання", + description="Виконання за межами сервера не є можливим.", + color=Color.FAIL, ) ) - bots = await config_get("bots") - for bot in bots: - await created_channel.set_permissions( - ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]), - view_channel=False, - ) - else: + return + + # Return if the user already has a custom channel + if holo_user_ctx.customchannel is not None: await ctx.defer(ephemeral=True) await ctx.respond( embed=Embed( title="Помилка виконання", description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.", - color=Color.fail, + color=Color.FAIL, ) ) + return - @customchannel.command( + await ctx.defer() + + created_channel: TextChannel = await ctx.user.guild.create_text_channel( + name=name, + reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал", + category=ds_utils.get( + ctx.author.guild.categories, + id=await config_get("customchannels", "categories"), + ), + ) + + await created_channel.set_permissions( + ctx.user.guild.default_role, + send_messages=False, + add_reactions=reactions, + create_public_threads=threads, + create_private_threads=threads, + ) + await created_channel.set_permissions( + ctx.user, + attach_files=True, + manage_messages=True, + send_messages=True, + embed_links=True, + manage_channels=True, + ) + + await holo_user_ctx.set("customchannel", created_channel.id) + + await ctx.respond( + embed=Embed( + title="Створено канал", + description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`", + color=Color.SUCCESS, + ) + ) + + bots: Dict[str, Any] = await config_get("bots") + + for bot in bots: + role: Union[Role, None] = ds_utils.get( + ctx.user.guild.roles, id=bots[bot]["role"] + ) + + if role is not None: + await created_channel.set_permissions( + role, + view_channel=False, + ) + + @custom_channel_group.command( name="edit", description="Змінити параметри особистого каналу", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option("name", description="Назва каналу") @option("reactions", description="Дозволити реакції") @option("threads", description="Дозволити гілки") - async def customchannel_edit_cmd( + async def custom_channel_edit_cmd( self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool - ): - holo_user_ctx = HoloUser(ctx.user) + ) -> None: + holo_user_ctx: HoloUser = HoloUser(ctx.user) - custom_channel = ds_utils.get( + custom_channel: Union[TextChannel, None] = ds_utils.get( ctx.guild.channels, id=holo_user_ctx.customchannel ) + + # Return if the channel was not found if custom_channel is None: await ctx.respond( embed=Embed( title="Канал не знайдено", description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", - color=Color.fail, + color=Color.FAIL, ) ) return + await custom_channel.edit(name=name) await custom_channel.set_permissions( ctx.user.guild.default_role, @@ -119,68 +155,84 @@ class CustomChannels(commands.Cog): create_public_threads=threads, create_private_threads=threads, ) + await ctx.respond( embed=Embed( title="Канал змінено", description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`", - color=Color.fail, + color=Color.FAIL, ) ) - @customchannel.command( + @custom_channel_group.command( name="remove", description="Відібрати канал, знищуючи його, та частково повернути кошти", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option("confirm", description="Підтвердження операції") - async def customchannel_remove_cmd( + async def custom_channel_remove_cmd( self, ctx: ApplicationContext, confirm: bool = False - ): - holo_user_ctx = HoloUser(ctx.user) + ) -> None: + holo_user_ctx: HoloUser = HoloUser(ctx.user) - if holo_user_ctx.customchannel is not None: - await ctx.defer() - custom_channel = ds_utils.get( - ctx.guild.channels, id=holo_user_ctx.customchannel - ) - if custom_channel is None: - await ctx.respond( - embed=Embed( - title="Канал не знайдено", - description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", - color=Color.fail, - ) - ) - holo_user_ctx.set("customchannel", None) - return - if not confirm: - await ctx.respond( - embed=Embed( - title="Підтвердження не надано", - description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.", - color=Color.fail, - ) - ) - return - await custom_channel.delete(reason="Власник запросив видалення") - holo_user_ctx.set("customchannel", None) - await ctx.respond( - embed=Embed( - title="Канал знищено", - description="Ви відмовились від каналу та видалили його.", - color=Color.default, - ) - ) - else: + # Return if the user does not have a custom channel + if holo_user_ctx.customchannel is None: await ctx.defer(ephemeral=True) await ctx.respond( embed=Embed( title="Помилка виконання", description="У вас немає особистого каналу.", - color=Color.fail, + color=Color.FAIL, ) ) + return + + await ctx.defer() + + custom_channel: Union[TextChannel, None] = ds_utils.get( + ctx.guild.channels, id=holo_user_ctx.customchannel + ) + + # Return if the channel was not found + if custom_channel is None: + await ctx.respond( + embed=Embed( + title="Канал не знайдено", + description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", + color=Color.FAIL, + ) + ) + await holo_user_ctx.set("customchannel", None) + return + + # Return if the confirmation is missing + if not confirm: + await ctx.respond( + embed=Embed( + title="Підтвердження не надано", + description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.", + color=Color.FAIL, + ) + ) + return + + await custom_channel.delete(reason="Власник запросив видалення") + + await holo_user_ctx.set("customchannel", None) + + try: + await ctx.respond( + embed=Embed( + title="Канал знищено", + description="Ви відмовились від каналу та видалили його.", + color=Color.DEFAULT, + ) + ) + except Exception as exc: + logger.warning( + "Could not send a custom channel removal confirmation due to: %s", exc + ) -def setup(client: PycordBot): +def setup(client: PycordBot) -> None: client.add_cog(CustomChannels(client)) diff --git a/cogs/data.py b/cogs/data.py index f7249f5..716f1b0 100644 --- a/cogs/data.py +++ b/cogs/data.py @@ -1,167 +1,183 @@ import logging from os import makedirs from pathlib import Path +from typing import Union, List, Dict, Any from uuid import uuid4 -from discord import ApplicationContext, Embed, File, option +from discord import ApplicationContext, Embed, File, option, Role, TextChannel from discord import utils as ds_utils from discord.commands import SlashCommandGroup from discord.ext import commands +from libbot import config_get from libbot.pycord.classes import PycordBot +from libbot.sync import config_get as sync_config_get +from libbot.sync import json_write as sync_json_write from classes.holo_user import HoloUser -from enums.colors import Color +from enums import Color from modules.database import col_users -from modules.utils import config_get -from modules.utils_sync import config_get_sync, guild_name, json_write_sync +from modules.utils_sync import guild_name logger = logging.getLogger(__name__) class Data(commands.Cog): def __init__(self, client: PycordBot): - self.client = client + self.client: PycordBot = client - data = SlashCommandGroup("data", "Керування даними користувачів") + data: SlashCommandGroup = SlashCommandGroup("data", "Керування даними користувачів") @data.command( name="export", description="Експортувати дані", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option( "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] ) - async def data_export_cmd(self, ctx: ApplicationContext, kind: str): + async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None: await ctx.defer() - holo_user = HoloUser(ctx.author) - if (ctx.user.id in self.client.owner_ids) or ( - await holo_user.is_council(ctx.author) + + # Return if the user is not an owner and not in the council + if (ctx.user.id not in self.client.owner_ids) and not ( + await HoloUser.is_council(ctx.author) ): - logging.info( - "Moderator %s exported current users list", guild_name(ctx.user) - ) - makedirs("tmp", exist_ok=True) - uuid = str(uuid4()) - - if kind == "Користувачі": - users = [] - - for member in ctx.guild.members: - users.append( - { - "id": member.id, - "nick": member.nick, - "username": f"{member.name}#{member.discriminator}", - "bot": member.bot, - } - ) - - json_write_sync(users, str(Path(f"tmp/{uuid}"))) - - await ctx.respond( - file=File(str(Path(f"tmp/{uuid}")), filename="users.json") - ) - else: logging.info( "User %s tried to use /export but permission denied", guild_name(ctx.user), ) + await ctx.respond( embed=Embed( title="Відмовлено в доступі", description="Здається, це команда лише для модераторів", - color=Color.fail, + color=Color.FAIL, ) ) - mod_role = ds_utils.get( + + mod_role: Union[Role, None] = ds_utils.get( ctx.user.guild.roles, id=await config_get("moderators", "roles") ) - admin_chan = ds_utils.get( + admin_chan: Union[TextChannel, None] = ds_utils.get( ctx.user.guild.channels, id=await config_get("adminchat", "channels", "text"), ) + await admin_chan.send( - content=f"{mod_role.mention}", + content="" if mod_role is None else mod_role.mention, embed=Embed( title="Неавторизований запит", description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", - color=Color.fail, + color=Color.FAIL, ), ) + return + + logging.info("Moderator %s exported current users list", guild_name(ctx.user)) + + makedirs("tmp", exist_ok=True) + + uuid: str = str(uuid4()) + + if kind == "Користувачі": + users: List[Dict[str, Any]] = [] + + for member in ctx.guild.members: + users.append( + { + "id": member.id, + "nick": member.nick, + "username": f"{member.name}#{member.discriminator}", + "bot": member.bot, + } + ) + + sync_json_write(users, Path(f"tmp/{uuid}")) + + await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json")) + @data.command( name="migrate", description="Мігрувати всіх користувачів до бази", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option( "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] ) - async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str): + async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None: await ctx.defer() - holo_user = HoloUser(ctx.author) - if (ctx.user.id in self.client.owner_ids) or ( - await holo_user.is_council(ctx.author) + + # Return if the user is not an owner and not in the council + if (ctx.user.id not in self.client.owner_ids) and not ( + await HoloUser.is_council(ctx.author) ): - logging.info( - "Moderator %s started migration of all members to the database", - guild_name(ctx.user), - ) - - if kind == "Користувачі": - for member in ctx.guild.members: - if member.bot: - continue - if col_users.find_one({"user": member.id}) is None: - user = {} - defaults = await config_get("user", "defaults") - - user["user"] = member.id - - for key in defaults: - user[key] = defaults[key] - - col_users.insert_one(document=user) - logging.info( - "Added DB record for user %s during migration", member.id - ) - - await ctx.respond( - embed=Embed( - title="Міграцію завершено", - description="Всім користувачам сервера було створено записи в базі даних.", - color=Color.success, - ) - ) - else: logging.info( "User %s tried to use /migrate but permission denied", guild_name(ctx.user), ) + await ctx.respond( embed=Embed( title="Відмовлено в доступі", description="Здається, це команда лише для модераторів", - color=Color.fail, + color=Color.FAIL, ) ) - mod_role = ds_utils.get( + + mod_role: Union[Role, None] = ds_utils.get( ctx.user.guild.roles, id=await config_get("moderators", "roles") ) - admin_chan = ds_utils.get( + admin_chan: Union[TextChannel, None] = ds_utils.get( ctx.user.guild.channels, id=await config_get("adminchat", "channels", "text"), ) - await admin_chan.send( - content=f"{mod_role.mention}", - embed=Embed( - title="Неавторизований запит", - description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", - color=Color.fail, - ), + + if admin_chan is not None: + await admin_chan.send( + content="" if mod_role is None else mod_role.mention, + embed=Embed( + title="Неавторизований запит", + description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.", + color=Color.FAIL, + ), + ) + + return + + logging.info( + "Moderator %s started migration of all members to the database", + guild_name(ctx.user), + ) + + if kind == "Користувачі": + for member in ctx.guild.members: + if member.bot: + continue + + if (await col_users.find_one({"user": member.id})) is None: + user: Dict[str, Any] = {} + defaults: Dict[str, Any] = await config_get("user", "defaults") + + user["user"] = member.id + + for key in defaults: + user[key] = defaults[key] + + await col_users.insert_one(document=user) + + logging.info( + "Added DB record for user %s during migration", member.id + ) + + await ctx.respond( + embed=Embed( + title="Міграцію завершено", + description="Всім користувачам сервера було створено записи в базі даних.", + color=Color.SUCCESS, ) + ) -def setup(client: PycordBot): +def setup(client: PycordBot) -> None: client.add_cog(Data(client)) diff --git a/cogs/fun.py b/cogs/fun.py index c539b09..a32adcd 100644 --- a/cogs/fun.py +++ b/cogs/fun.py @@ -2,39 +2,38 @@ import logging from discord import ApplicationContext, Embed, User, option, slash_command from discord.ext import commands +from libbot import config_get from libbot.pycord.classes import PycordBot -from WaifuPicsPython import WaifuAsync +from libbot.sync import config_get as sync_config_get -from modules.utils import config_get -from modules.utils_sync import config_get_sync, guild_name +from modules.utils_sync import guild_name +from modules.waifu_pics import waifu_pics logger = logging.getLogger(__name__) -wafiu_pics = WaifuAsync() - class Fun(commands.Cog): def __init__(self, client: PycordBot): - self.client = client + self.client: PycordBot = client @slash_command( name="action", description="Провести над користувачем РП дію", - guild_ids=[config_get_sync("guild")], + guild_ids=[sync_config_get("guild")], ) @option( "type", description="Тип дії, яку хочете провести з користувачем", - choices=config_get_sync("actions").keys(), + choices=sync_config_get("actions").keys(), ) @option("user", description="Користувач") - async def action_cmd(self, ctx: ApplicationContext, type: str, user: User): + async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None: await ctx.defer() - action = await config_get("category", "actions", type) - action_verb = await config_get("action", "actions", type) + action: str = await config_get("category", "actions", type) + action_verb: str = await config_get("action", "actions", type) - image = await wafiu_pics.sfw(action) + image_url: str = await waifu_pics.sfw(action) logger.info( "User %s (%s) %s %s (%s) with image %s", @@ -43,17 +42,17 @@ class Fun(commands.Cog): action_verb, guild_name(user), user.id, - image, + image_url, ) - embed = Embed( + embed: Embed = Embed( description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**", color=0x2F3136, ) - embed.set_image(url=image) + embed.set_image(url=image_url) await ctx.respond(embed=embed) -def setup(client: PycordBot): +def setup(client: PycordBot) -> None: client.add_cog(Fun(client)) diff --git a/cogs/logger.py b/cogs/logger.py index a3d3d61..7070188 100644 --- a/cogs/logger.py +++ b/cogs/logger.py @@ -1,49 +1,51 @@ -from discord import Member, Message +from typing import Dict, Any, Union + +from discord import Member, Message, TextChannel from discord import utils as ds_utils from discord.ext import commands +from libbot import config_get from libbot.pycord.classes import PycordBot from modules.database import col_users -from modules.utils import config_get class Logger(commands.Cog): def __init__(self, client: PycordBot): - self.client = client + self.client: PycordBot = client @commands.Cog.listener() async def on_message(self, message: Message): if ( (message.author != self.client.user) - and (message.author.bot == False) - and (message.author.system == False) + and (message.author.bot is False) + and (message.author.system is False) ): - if col_users.find_one({"user": message.author.id}) is None: - user = {} - defaults = await config_get("user", "defaults") + if (await col_users.find_one({"user": message.author.id})) is None: + user: Dict[str, Any] = {} + defaults: Dict[str, Any] = await config_get("user", "defaults") user["user"] = message.author.id for key in defaults: user[key] = defaults[key] - col_users.insert_one(document=user) + await col_users.insert_one(document=user) @commands.Cog.listener() - async def on_member_join(self, member: Member): - welcome_chan = ds_utils.get( + async def on_member_join(self, member: Member) -> None: + welcome_chan: Union[TextChannel, None] = ds_utils.get( self.client.get_guild(await config_get("guild")).channels, id=await config_get("welcome", "channels", "text"), ) - rules_chan = ds_utils.get( + rules_chan: Union[TextChannel, None] = ds_utils.get( self.client.get_guild(await config_get("guild")).channels, id=await config_get("rules", "channels", "text"), ) if ( (member != self.client.user) - and (member.bot == False) - and (member.system == False) + and (member.bot is False) + and (member.system is False) ): await welcome_chan.send( content=(await config_get("welcome", "messages")).format( @@ -51,17 +53,17 @@ class Logger(commands.Cog): ) ) - if col_users.find_one({"user": member.id}) is None: - user = {} - defaults = await config_get("user", "defaults") + if (await col_users.find_one({"user": member.id})) is None: + user: Dict[str, Any] = {} + defaults: Dict[str, Any] = await config_get("user", "defaults") user["user"] = member.id for key in defaults: user[key] = defaults[key] - col_users.insert_one(document=user) + await col_users.insert_one(document=user) -def setup(client: PycordBot): +def setup(client: PycordBot) -> None: client.add_cog(Logger(client)) diff --git a/enums/__init__.py b/enums/__init__.py new file mode 100644 index 0000000..87c7008 --- /dev/null +++ b/enums/__init__.py @@ -0,0 +1 @@ +from .colors import Color diff --git a/enums/colors.py b/enums/colors.py index 8f647b2..d4549c4 100644 --- a/enums/colors.py +++ b/enums/colors.py @@ -1,6 +1,7 @@ from enum import IntEnum + class Color(IntEnum): - fail = 0xd6345b - success = 0x84d961 - default = 0xa7a6ab \ No newline at end of file + FAIL = 0xD6345B + SUCCESS = 0x84D961 + DEFAULT = 0xA7A6AB diff --git a/errors/__init__.py b/errors/__init__.py new file mode 100644 index 0000000..ec5a055 --- /dev/null +++ b/errors/__init__.py @@ -0,0 +1 @@ +from .user import UserNotFoundError diff --git a/errors/user.py b/errors/user.py new file mode 100644 index 0000000..b34b6e7 --- /dev/null +++ b/errors/user.py @@ -0,0 +1,10 @@ +class UserNotFoundError(Exception): + """HoloUser could not find user with such an ID in database""" + + def __init__(self, user, user_id): + self.user = user + self.user_id = user_id + + super().__init__( + f"User of type {type(self.user)} with id {self.user_id} was not found" + ) diff --git a/main.py b/main.py index 28da765..50b4093 100644 --- a/main.py +++ b/main.py @@ -1,11 +1,13 @@ import logging +import sys +from logging import Logger from discord import Activity, ActivityType +from libbot import config_get +from libbot.sync import config_get as sync_config_get from modules.client import client -from modules.scheduled import scheduler -from modules.utils import config_get -from modules.utils_sync import config_get_sync +from modules.scheduler import scheduler logging.basicConfig( level=logging.INFO, @@ -13,7 +15,7 @@ logging.basicConfig( datefmt="[%X]", ) -logger = logging.getLogger(__name__) +logger: Logger = logging.getLogger(__name__) try: import uvloop # type: ignore @@ -24,11 +26,11 @@ except ImportError: @client.event -async def on_ready(): +async def on_ready() -> None: logger.info("Logged in as %s", client.user) - activity_type = await config_get("type", "status") - activity_message = await config_get("message", "status") + activity_type: str = await config_get("type", "status") + activity_message: str = await config_get("message", "status") if activity_type == "playing": await client.change_presence( @@ -62,15 +64,15 @@ async def on_ready(): ) -def main(): +def main() -> None: client.load_extension("cogs") try: scheduler.start() - client.run(config_get_sync("bot_token", "bot")) + client.run(sync_config_get("bot_token", "bot")) except KeyboardInterrupt: scheduler.shutdown() - exit() + sys.exit() if __name__ == "__main__": diff --git a/modules/client.py b/modules/client.py index 407b54e..926632b 100644 --- a/modules/client.py +++ b/modules/client.py @@ -1,6 +1,10 @@ from discord import Intents from libbot.pycord.classes import PycordBot -intents = Intents().all() +from modules.scheduler import scheduler + +intents: Intents = Intents().all() + intents.members = True -client = PycordBot(intents=intents) + +client: PycordBot = PycordBot(intents=intents, scheduler=scheduler) diff --git a/modules/database.py b/modules/database.py index bb7eaaf..8d2d0ae 100644 --- a/modules/database.py +++ b/modules/database.py @@ -1,12 +1,19 @@ +from typing import Dict, Any + +from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase +from libbot.sync import config_get as sync_config_get from pymongo import MongoClient -from ujson import loads +from pymongo.synchronous.collection import Collection +from pymongo.synchronous.database import Database -with open("config.json", "r", encoding="utf-8") as f: - db_config = loads(f.read())["database"] - f.close() +db_config: Dict[str, Any] = sync_config_get("database") -db_client = MongoClient( - "mongodb://{0}:{1}@{2}:{3}/{4}".format( +con_string: str = ( + "mongodb://{0}:{1}/{2}".format( + db_config["host"], db_config["port"], db_config["name"] + ) + if db_config["user"] is None or db_config["password"] is None + else "mongodb://{0}:{1}@{2}:{3}/{4}".format( db_config["user"], db_config["password"], db_config["host"], @@ -14,14 +21,20 @@ db_client = MongoClient( db_config["name"], ) ) -db = db_client.get_database(name=db_config["name"]) -collections = db.list_collection_names() +db_client: AsyncClient = AsyncClient(con_string) +db_client_sync: MongoClient = MongoClient(con_string) -for collection in ["users", "warnings", "scheduler", "analytics"]: - if not collection in collections: - db.create_collection(collection) +# Async declarations per default +db: AsyncDatabase = db_client.get_database(name=db_config["name"]) -col_users = db.get_collection("users") -col_warnings = db.get_collection("warnings") -col_analytics = db.get_collection("analytics") +col_users: AsyncCollection = db.get_collection("users") +col_warnings: AsyncCollection = db.get_collection("warnings") +col_analytics: AsyncCollection = db.get_collection("analytics") + +# Sync declarations as a fallback +sync_db: Database = db_client_sync.get_database(name=db_config["name"]) + +sync_col_users: Collection = sync_db.get_collection("users") +sync_col_warnings: Collection = sync_db.get_collection("warnings") +sync_col_analytics: Collection = sync_db.get_collection("analytics") diff --git a/modules/scheduled.py b/modules/scheduler.py similarity index 55% rename from modules/scheduled.py rename to modules/scheduler.py index a5eb79d..54ace84 100644 --- a/modules/scheduled.py +++ b/modules/scheduler.py @@ -1,3 +1,3 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler -scheduler = AsyncIOScheduler() +scheduler: AsyncIOScheduler = AsyncIOScheduler() diff --git a/modules/utils.py b/modules/utils.py deleted file mode 100644 index 8a4f34a..0000000 --- a/modules/utils.py +++ /dev/null @@ -1,38 +0,0 @@ -from typing import Any - -import aiofiles -from ujson import dumps, loads - - -async def json_read(path: str) -> Any: - async with aiofiles.open(path, mode="r", encoding="utf-8") as f: - data = await f.read() - return loads(data) - - -async def json_write(data: Any, path: str) -> None: - async with aiofiles.open(path, mode="w", encoding="utf-8") as f: - await f.write( - dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4) - ) - - -async def config_get(key: str, *path: str) -> Any: - this_key = await json_read("config.json") - for dict_key in path: - this_key = this_key[dict_key] - return this_key[key] - - -async def config_set(key: str, value: Any, *path: str) -> None: - this_dict = await json_read("config.json") - string = "this_dict" - for arg in path: - string += f'["{arg}"]' - if type(value) in [str]: - string += f'["{key}"] = "{value}"' - else: - string += f'["{key}"] = {value}' - exec(string) - await json_write(this_dict, "config.json") - return diff --git a/modules/utils_sync.py b/modules/utils_sync.py index 3f80df6..b5690f3 100644 --- a/modules/utils_sync.py +++ b/modules/utils_sync.py @@ -1,45 +1,10 @@ -from typing import Any, Union +from typing import Union from discord import Member, User -from ujson import dumps, loads - - -def json_read_sync(path: str) -> Any: - with open(path, mode="r", encoding="utf-8") as f: - data = f.read() - return loads(data) - - -def json_write_sync(data: Any, path: str) -> None: - with open(path, mode="w", encoding="utf-8") as f: - f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)) - - -def config_get_sync(key: str, *path: str) -> Any: - this_key = json_read_sync("config.json") - for dict_key in path: - this_key = this_key[dict_key] - return this_key[key] - - -def config_set_sync(key: str, value: Any, *path: str) -> None: - this_dict = json_read_sync("config.json") - string = "this_dict" - for arg in path: - string += f'["{arg}"]' - if type(value) in [str]: - string += f'["{key}"] = "{value}"' - else: - string += f'["{key}"] = {value}' - exec(string) - json_write_sync(this_dict, "config.json") - return def guild_name(member: Union[Member, User]) -> str: if isinstance(member, User): return member.name - if member.nick == None: - return member.name - else: - return member.nick + + return member.name if member.nick is None else member.nick diff --git a/modules/waifu_pics.py b/modules/waifu_pics.py new file mode 100644 index 0000000..fe3d158 --- /dev/null +++ b/modules/waifu_pics.py @@ -0,0 +1,3 @@ +from WaifuPicsPython import WaifuAsync + +waifu_pics: WaifuAsync = WaifuAsync() diff --git a/requirements.txt b/requirements.txt index 0b6c225..0bd201d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,10 @@ -aiofiles==23.2.1 -apscheduler==3.10.4 -pymongo~=4.7.3 -requests~=2.32.3 +# Waifu pics related dependencies (not listed directly by waifupics) +aiohttp>=3.10.0 +requests>=2.32.2 + +aiofiles~=24.1.0 +apscheduler>=3.10.0 +async_pymongo==0.1.11 +libbot[speed,pycord]==3.2.3 ujson~=5.10.0 -WaifuPicsPython==0.2.0 ---extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple -libbot[speed,pycord]==3.2.2 \ No newline at end of file +WaifuPicsPython==0.2.0 \ No newline at end of file