9 Commits

10 changed files with 88 additions and 126 deletions

View File

@@ -1,5 +1,5 @@
import logging import logging
from typing import Any, Union, Dict from typing import Any, Dict
from bson import ObjectId from bson import ObjectId
from discord import User, Member from discord import User, Member
@@ -12,11 +12,11 @@ logger = logging.getLogger(__name__)
class HoloUser: class HoloUser:
def __init__(self, user: Union[User, Member, int]) -> None: def __init__(self, user: User | Member | int) -> None:
"""Get an object that has a proper binding between Discord ID and database """Get an object that has a proper binding between Discord ID and database
### Args: ### Args:
* `user` (Union[User, Member, int]): Object from which ID can be extracted * `user` (User | Member | int): Object from which ID can be extracted
### Raises: ### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database * `UserNotFoundError`: User with such ID does not seem to exist in database
@@ -24,17 +24,15 @@ class HoloUser:
self.id: int = user if not hasattr(user, "id") else user.id self.id: int = user if not hasattr(user, "id") else user.id
jav_user: Union[Dict[str, Any], None] = sync_col_users.find_one( jav_user: Dict[str, Any] | None = sync_col_users.find_one({"user": self.id})
{"user": self.id}
)
if jav_user is None: if jav_user is None:
raise UserNotFoundError(user=user, user_id=self.id) raise UserNotFoundError(user=user, user_id=self.id)
self.db_id: ObjectId = jav_user["_id"] self.db_id: ObjectId = jav_user["_id"]
self.customrole: Union[int, None] = jav_user["customrole"] self.customrole: int | None = jav_user["customrole"]
self.customchannel: Union[int, None] = jav_user["customchannel"] self.customchannel: int | None = jav_user["customchannel"]
self.warnings: int = self.warns() self.warnings: int = self.warns()
def warns(self) -> int: def warns(self) -> int:
@@ -43,9 +41,7 @@ class HoloUser:
### Returns: ### Returns:
* `int`: Number of warnings * `int`: Number of warnings
""" """
warns: Union[Dict[str, Any], None] = sync_col_warnings.find_one( warns: Dict[str, Any] | None = sync_col_warnings.find_one({"user": self.id})
{"user": self.id}
)
return 0 if warns is None else warns["warns"] return 0 if warns is None else warns["warns"]
@@ -55,9 +51,7 @@ class HoloUser:
### Args: ### Args:
* `count` (int, optional): Count of warnings to be added. Defaults to 1. * `count` (int, optional): Count of warnings to be added. Defaults to 1.
""" """
warns: Union[Dict[str, Any], None] = await col_warnings.find_one( warns: Dict[str, Any] | None = await col_warnings.find_one({"user": self.id})
{"user": self.id}
)
if warns is not None: if warns is not None:
await col_warnings.update_one( await col_warnings.update_one(
@@ -88,11 +82,11 @@ class HoloUser:
logger.info("Set attribute %s of user %s to %s", key, self.id, value) logger.info("Set attribute %s of user %s to %s", key, self.id, value)
@staticmethod @staticmethod
async def is_moderator(member: Union[User, Member]) -> bool: async def is_moderator(member: User | Member) -> bool:
"""Check if user is moderator or council member """Check if user is moderator or council member
### Args: ### Args:
* `member` (Union[User, Member]): Member object * `member` (User | Member): Member object
### Returns: ### Returns:
`bool`: `True` if member is a moderator or member of council and `False` if not `bool`: `True` if member is a moderator or member of council and `False` if not
@@ -100,8 +94,8 @@ class HoloUser:
if isinstance(member, User): if isinstance(member, User):
return False return False
moderator_role: Union[int, None] = await config_get("moderators", "roles") moderator_role: int | None = await config_get("moderators", "roles")
council_role: Union[int, None] = await config_get("council", "roles") council_role: int | None = await config_get("council", "roles")
for role in member.roles: for role in member.roles:
if role.id in (moderator_role, council_role): if role.id in (moderator_role, council_role):
@@ -110,11 +104,11 @@ class HoloUser:
return False return False
@staticmethod @staticmethod
async def is_council(member: Union[User, Member]) -> bool: async def is_council(member: User | Member) -> bool:
"""Check if user is a member of council """Check if user is a member of council
### Args: ### Args:
* `member` (Union[User, Member]): Member object * `member` (User | Member): Member object
### Returns: ### Returns:
`bool`: `True` if member is a member of council and `False` if not `bool`: `True` if member is a member of council and `False` if not

View File

@@ -1,6 +1,5 @@
import logging import logging
import sys import sys
from typing import Union
from discord import ( from discord import (
ApplicationContext, ApplicationContext,
@@ -30,89 +29,6 @@ class Admin(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: HoloBot):
self.client: HoloBot = client self.client: HoloBot = client
# Disabled because warning functionality is temporarily not needed
# @slash_command(
# name="warning",
# description="Попередити юзера про порушення правил",
# guild_ids=[config_get_sync("guild")],
# )
# @option("user", description="Користувач")
# @option("reason", description="Причина")
# async def warn_cmd(
# self,
# ctx: ApplicationContext,
# user: User,
# reason: str = "Не вказана",
# ):
# logging.info(f"User {ctx.user.id} warned {user.id} for {reason}")
# await ctx.defer()
# jav_user = HoloUser(user)
# if ctx.user.id in await config_get("admins"):
# logging.info(
# f"Moderator {guild_name(ctx.user)} warned {guild_name(user)} for {reason} (has {jav_user.warnings} warns)"
# )
# if jav_user.warnings >= 5:
# logging.info(
# f"User {guild_name(user)} was banned due to a big amount of warns ({jav_user.warnings})"
# )
# await user.send(
# embed=Embed(
# title="Перманентне блокування",
# description=f"Вас було заблоковано за неодноразове порушення правил сервера.",
# color=Color.fail,
# )
# )
# await user.ban(reason=reason)
# elif jav_user.warnings >= 2:
# logging.info(
# f"User {guild_name(user)} was muted due to a big amount of warns ({jav_user.warnings})"
# )
# jav_user.warn(reason=reason)
# await user.send(
# embed=Embed(
# title="Тимчасове блокування",
# description=f"Причина: `{reason}`\n\nНа вашому рахунку вже {jav_user.warnings} попереджень. Вас було тимчасово заблоковано на **1 годину**.\n\nЯкщо Ви продовжите порушувати правила сервера згодом Вас заблокують.",
# color=0xDED56B,
# )
# )
# await user.timeout_for(timedelta(hours=1), reason=reason)
# else:
# jav_user.warn()
# await ctx.respond(
# embed=Embed(
# title="Попередження",
# description=f"{user.mention} Будь ласка, не порушуйте правила. Ви отримали попередження з причини `{reason}`.\n\nЯкщо Ви продовжите порушувати правила це може призвести до блокування в спільноті.",
# color=0xDED56B,
# )
# )
# else:
# logging.warning(
# f"User {guild_name(ctx.user)} tried to use /warn but permission denied"
# )
# await ctx.respond(
# embed=Embed(
# title="Відмовлено в доступі",
# description="Здається, це команда лише для модераторів",
# color=Color.fail,
# )
# )
# mod_role = ds_utils.get(
# ctx.user.guild.roles, id=await config_get("moderators", "roles")
# )
# admin_chan = ds_utils.get(
# ctx.user.guild.channels,
# id=await config_get("adminchat", "channels", "text"),
# )
# await admin_chan.send(
# content=f"{mod_role.mention}",
# embed=Embed(
# title="Неавторизований запит",
# description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
# color=Color.fail,
# ),
# )
@slash_command( @slash_command(
name="clear", name="clear",
description="Видалити деяку кількість повідомлень в каналі", description="Видалити деяку кількість повідомлень в каналі",
@@ -126,6 +42,10 @@ class Admin(commands.Cog):
amount: int, amount: int,
user: User, user: User,
) -> None: ) -> None:
"""Command /clear <amount> [<user>]
Removes last <amount> messages in the current channel. Optionally from a specific user.
"""
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
logging.info( logging.info(
"User %s removed %s message(s) in %s", "User %s removed %s message(s) in %s",
@@ -160,10 +80,10 @@ class Admin(commands.Cog):
) )
) )
mod_role: Union[Role, None] = ds_utils.get( mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan: Union[TextChannel, None] = ds_utils.get( admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
@@ -184,6 +104,10 @@ class Admin(commands.Cog):
guild_ids=[config_get("guild")], guild_ids=[config_get("guild")],
) )
async def reboot_cmd(self, ctx: ApplicationContext) -> None: async def reboot_cmd(self, ctx: ApplicationContext) -> None:
"""Command /reboot
Stops the bot. Is called "reboot" because it's assumed that the bot has automatic restart.
"""
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
@@ -216,10 +140,10 @@ class Admin(commands.Cog):
) )
) )
mod_role: Union[Role, None] = ds_utils.get( mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan: Union[TextChannel, None] = ds_utils.get( admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )

View File

@@ -16,11 +16,13 @@ class Analytics(commands.Cog):
@Cog.listener() @Cog.listener()
async def on_message(self, message: Message) -> None: async def on_message(self, message: Message) -> None:
"""Listener that collects analytical data (stickers, attachments, messages)."""
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot is False) and (message.author.bot is False)
and (message.author.system is False) and (message.author.system is False)
): ):
# Handle stickers
stickers: List[Dict[str, Any]] = [] stickers: List[Dict[str, Any]] = []
for sticker in message.stickers: for sticker in message.stickers:
@@ -33,6 +35,7 @@ class Analytics(commands.Cog):
} }
) )
# Handle attachments
attachments: List[Dict[str, Any]] = [] attachments: List[Dict[str, Any]] = []
for attachment in message.attachments: for attachment in message.attachments:
@@ -49,6 +52,7 @@ class Analytics(commands.Cog):
} }
) )
# Insert entry into the database
await col_analytics.insert_one( await col_analytics.insert_one(
{ {
"user": message.author.id, "user": message.author.id,

View File

@@ -1,5 +1,5 @@
import logging import logging
from typing import Any, Dict, Union from typing import Any, Dict
from discord import ApplicationContext, Embed, option, TextChannel, Role from discord import ApplicationContext, Embed, option, TextChannel, Role
from discord import utils as ds_utils from discord import utils as ds_utils
@@ -42,6 +42,10 @@ class CustomChannels(commands.Cog):
async def custom_channel_get_cmd( async def custom_channel_get_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
) -> None: ) -> None:
"""Command /customchannel get <name> <reactions> <threads>
Command to create a custom channel for a user.
"""
holo_user_ctx: HoloUser = HoloUser(ctx.user) holo_user_ctx: HoloUser = HoloUser(ctx.user)
# Return if the user is using the command outside of a guild # Return if the user is using the command outside of a guild
@@ -108,9 +112,7 @@ class CustomChannels(commands.Cog):
bots: Dict[str, Any] = await config_get("bots") bots: Dict[str, Any] = await config_get("bots")
for bot in bots: for bot in bots:
role: Union[Role, None] = ds_utils.get( role: Role | None = ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"])
ctx.user.guild.roles, id=bots[bot]["role"]
)
if role is not None: if role is not None:
await created_channel.set_permissions( await created_channel.set_permissions(
@@ -129,9 +131,13 @@ class CustomChannels(commands.Cog):
async def custom_channel_edit_cmd( async def custom_channel_edit_cmd(
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
) -> None: ) -> None:
"""Command /customchannel edit <name> <reactions> <threads>
Command to change properties of a custom channel.
"""
holo_user_ctx: HoloUser = HoloUser(ctx.user) holo_user_ctx: HoloUser = HoloUser(ctx.user)
custom_channel: Union[TextChannel, None] = ds_utils.get( custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel ctx.guild.channels, id=holo_user_ctx.customchannel
) )
@@ -172,6 +178,9 @@ class CustomChannels(commands.Cog):
async def custom_channel_remove_cmd( async def custom_channel_remove_cmd(
self, ctx: ApplicationContext, confirm: bool = False self, ctx: ApplicationContext, confirm: bool = False
) -> None: ) -> None:
"""Command /customchannel remove [<confirm>]
Command to remove a custom channel. Requires additional confirmation."""
holo_user_ctx: HoloUser = HoloUser(ctx.user) holo_user_ctx: HoloUser = HoloUser(ctx.user)
# Return if the user does not have a custom channel # Return if the user does not have a custom channel
@@ -188,7 +197,7 @@ class CustomChannels(commands.Cog):
await ctx.defer() await ctx.defer()
custom_channel: Union[TextChannel, None] = ds_utils.get( custom_channel: TextChannel | None = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel ctx.guild.channels, id=holo_user_ctx.customchannel
) )

View File

@@ -2,7 +2,7 @@ import logging
from logging import Logger from logging import Logger
from os import makedirs from os import makedirs
from pathlib import Path from pathlib import Path
from typing import Union, List, Dict, Any from typing import List, Dict, Any
from uuid import uuid4 from uuid import uuid4
from discord import ApplicationContext, Embed, File, option, Role, TextChannel from discord import ApplicationContext, Embed, File, option, Role, TextChannel
@@ -35,6 +35,9 @@ class Data(commands.Cog):
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None: async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None:
"""Command /data export <kind>
Command to export specific kind of data."""
await ctx.defer() await ctx.defer()
# Return if the user is not an owner and not in the council # Return if the user is not an owner and not in the council
@@ -54,10 +57,10 @@ class Data(commands.Cog):
) )
) )
mod_role: Union[Role, None] = ds_utils.get( mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan: Union[TextChannel, None] = ds_utils.get( admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
@@ -92,6 +95,8 @@ class Data(commands.Cog):
} }
) )
# Temporary file must be written synchronously,
# otherwise it will not be there when ctx.respond() is be called
json_write(users, Path(f"tmp/{uuid}")) json_write(users, Path(f"tmp/{uuid}"))
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json")) await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
@@ -105,6 +110,11 @@ class Data(commands.Cog):
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None: async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None:
"""Command /migrate <kind>
Command to migrate specific kind of data.
Migration of users in this case means creation of their DB entries."""
await ctx.defer() await ctx.defer()
# Return if the user is not an owner and not in the council # Return if the user is not an owner and not in the council
@@ -124,10 +134,10 @@ class Data(commands.Cog):
) )
) )
mod_role: Union[Role, None] = ds_utils.get( mod_role: Role | None = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
admin_chan: Union[TextChannel, None] = ds_utils.get( admin_chan: TextChannel | None = ds_utils.get(
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )

View File

@@ -27,6 +27,9 @@ class Fun(commands.Cog):
) )
@option("user", description="Користувач") @option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None: async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None:
"""Command /action <type> <user>
Command to perform some RP action on a user and send them a GIF."""
await ctx.defer() await ctx.defer()
action: str = await config_get("category", "actions", type) action: str = await config_get("category", "actions", type)

View File

@@ -1,6 +1,7 @@
from typing import Dict, Any, Union import logging
from typing import Dict, Any
from discord import Member, Message, TextChannel from discord import Member, Message, TextChannel, MessageType
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot.utils import config_get from libbot.utils import config_get
@@ -8,6 +9,8 @@ from libbot.utils import config_get
from classes.holo_bot import HoloBot from classes.holo_bot import HoloBot
from modules.database import col_users from modules.database import col_users
logger = logging.getLogger(__name__)
class Logger(commands.Cog): class Logger(commands.Cog):
def __init__(self, client: HoloBot): def __init__(self, client: HoloBot):
@@ -15,6 +18,7 @@ class Logger(commands.Cog):
@commands.Cog.listener() @commands.Cog.listener()
async def on_message(self, message: Message): async def on_message(self, message: Message):
"""Message listener. All actions on messages remain here for now."""
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot is False) and (message.author.bot is False)
@@ -31,17 +35,31 @@ class Logger(commands.Cog):
await col_users.insert_one(document=user) await col_users.insert_one(document=user)
if (
(message.type == MessageType.thread_created)
and (message.channel is not None)
and (
await col_users.count_documents({"customchannel": message.channel.id})
> 0
)
):
await message.delete()
@commands.Cog.listener() @commands.Cog.listener()
async def on_member_join(self, member: Member) -> None: async def on_member_join(self, member: Member) -> None:
welcome_chan: Union[TextChannel, None] = ds_utils.get( """Member join handler. All actions on member join remain here for now."""
welcome_chan: TextChannel | None = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels, self.client.get_guild(await config_get("guild")).channels,
id=await config_get("welcome", "channels", "text"), id=await config_get("welcome", "channels", "text"),
) )
rules_chan: Union[TextChannel, None] = ds_utils.get( rules_chan: TextChannel | None = ds_utils.get(
self.client.get_guild(await config_get("guild")).channels, self.client.get_guild(await config_get("guild")).channels,
id=await config_get("rules", "channels", "text"), id=await config_get("rules", "channels", "text"),
) )
if welcome_chan is None:
logger.warning("Could not find a welcome channel by its id")
if ( if (
(member != self.client.user) (member != self.client.user)
and (member.bot is False) and (member.bot is False)

View File

@@ -16,6 +16,7 @@ class Utility(commands.Cog):
@commands.Cog.listener() @commands.Cog.listener()
async def on_ready(self) -> None: async def on_ready(self) -> None:
"""Listener for the event when bot connects to Discord and becomes "ready"."""
logger.info("Logged in as %s", self.client.user) logger.info("Logged in as %s", self.client.user)
activity_type: str = await config_get("type", "status") activity_type: str = await config_get("type", "status")

View File

@@ -1,9 +1,7 @@
from typing import Union
from discord import Member, User from discord import Member, User
def guild_name(member: Union[Member, User]) -> str: def guild_name(member: Member | User) -> str:
if isinstance(member, User): if isinstance(member, User):
return member.name return member.name

View File

@@ -6,5 +6,6 @@ aiofiles~=24.1.0
apscheduler>=3.10.0 apscheduler>=3.10.0
async_pymongo==0.1.11 async_pymongo==0.1.11
libbot[speed,pycord]==4.0.0 libbot[speed,pycord]==4.0.0
typing-extensions~=4.12.2
ujson~=5.10.0 ujson~=5.10.0
WaifuPicsPython==0.2.0 WaifuPicsPython==0.2.0