2 Commits

Author SHA1 Message Date
kku
41112018da Working on #13 2024-12-15 23:36:48 +01:00
kku
982d0bce43 General improvements and refactoring 2024-12-15 23:21:41 +01:00
10 changed files with 289 additions and 302 deletions

View File

@@ -3,25 +3,20 @@ from typing import Any, Union
import discord
import discord.member
from libbot import config_get
from modules.database import col_users, col_warnings
from modules.utils import config_get
logger = logging.getLogger(__name__)
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found"
)
@@ -63,10 +58,8 @@ class HoloUser:
* `int`: Number of warnings
"""
warns = col_warnings.find_one({"user": self.id})
if warns == None:
return 0
else:
return warns["warns"]
return 0 if warns is None else warns["warns"]
def warn(self, count=1, reason: str = "Not provided") -> None:
"""Warn and add count to warns number
@@ -75,13 +68,15 @@ class HoloUser:
* `count` (int, optional): Count of warnings to be added. Defaults to 1.
"""
warns = col_warnings.find_one({"user": self.id})
if warns != None:
if warns is not None:
col_warnings.update_one(
filter={"_id": self.db_id},
update={"$set": {"warns": warns["warns"] + count}},
)
else:
col_warnings.insert_one(document={"user": self.id, "warns": count})
logger.info(f"User {self.id} was warned {count} times due to: {reason}")
def set(self, key: str, value: Any) -> None:
@@ -93,10 +88,12 @@ class HoloUser:
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
col_users.update_one(
filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
)
logger.info(f"Set attribute {key} of user {self.id} to {value}")
async def is_moderator(
@@ -112,11 +109,14 @@ class HoloUser:
"""
if isinstance(member, discord.User):
return False
moderator_role = await config_get("moderators", "roles")
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == moderator_role or role.id == council_role:
return True
return False
async def is_council(
@@ -132,10 +132,13 @@ class HoloUser:
"""
if isinstance(member, discord.User):
return False
council_role = await config_get("council", "roles")
for role in member.roles:
if role.id == council_role:
return True
return False
# def purge(self) -> None:

View File

@@ -1,22 +1,27 @@
import logging
import sys
from discord import ApplicationContext, Embed, User, option, slash_command
from discord import utils as ds_utils
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from enums.colors import Color
from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
from modules.utils_sync import guild_name
logger = logging.getLogger(__name__)
class Admin(commands.Cog):
"""Cog with utility commands for admins."""
def __init__(self, client: PycordBot):
self.client = client
# Disabled because warning functionality is temporarily not needed
# @slash_command(
# name="warning",
# description="Попередити юзера про порушення правил",
@@ -102,7 +107,7 @@ class Admin(commands.Cog):
@slash_command(
name="clear",
description="Видалити деяку кількість повідомлень в каналі",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("amount", description="Кількість")
@option("user", description="Користувач", default=None)
@@ -119,20 +124,25 @@ class Admin(commands.Cog):
amount,
ctx.channel.id,
)
await ctx.respond(
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
)
if user == None:
if user is None:
await ctx.channel.purge(limit=amount)
else:
await ctx.channel.purge(
limit=amount, check=lambda msg: msg.author == user
)
else:
return
logging.warning(
"User %s tried to use /clear but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
@@ -140,6 +150,7 @@ class Admin(commands.Cog):
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
@@ -147,6 +158,7 @@ class Admin(commands.Cog):
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
@@ -159,10 +171,11 @@ class Admin(commands.Cog):
@slash_command(
name="reboot",
description="Перезапустити бота",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
async def reboot_cmd(self, ctx: ApplicationContext):
await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids:
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
await ctx.respond(
@@ -173,12 +186,13 @@ class Admin(commands.Cog):
)
scheduler.shutdown()
await self.client.close()
exit()
else:
sys.exit()
logging.warning(
"User %s tried to use /reboot but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
@@ -186,6 +200,7 @@ class Admin(commands.Cog):
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
@@ -193,6 +208,7 @@ class Admin(commands.Cog):
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(

View File

@@ -17,8 +17,8 @@ class Analytics(commands.Cog):
async def on_message(self, message: Message):
if (
(message.author != self.client.user)
and (message.author.bot == False)
and (message.author.system == False)
and (message.author.bot is False)
and (message.author.system is False)
):
stickers = []
for sticker in message.stickers:

View File

@@ -3,13 +3,14 @@ from discord import utils as ds_utils
from discord.abc import GuildChannel
from discord.commands import SlashCommandGroup
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from classes.holo_user import HoloUser
from enums.colors import Color
from modules.database import col_users
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
from modules.utils_sync import guild_name
class CustomChannels(commands.Cog):
@@ -27,7 +28,7 @@ class CustomChannels(commands.Cog):
@customchannel.command(
name="get",
description="Отримати персональний текстовий канал",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@@ -37,8 +38,20 @@ class CustomChannels(commands.Cog):
):
holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel == None:
# Return if the user already has a custom channel
if holo_user_ctx.customchannel is not None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.fail,
)
)
return
await ctx.defer()
created_channel = await ctx.user.guild.create_text_channel(
name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
@@ -47,6 +60,7 @@ class CustomChannels(commands.Cog):
id=await config_get("customchannels", "categories"),
),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
@@ -62,7 +76,9 @@ class CustomChannels(commands.Cog):
embed_links=True,
manage_channels=True,
)
holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond(
embed=Embed(
title="Створено канал",
@@ -70,26 +86,19 @@ class CustomChannels(commands.Cog):
color=Color.success,
)
)
bots = await config_get("bots")
for bot in bots:
await created_channel.set_permissions(
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
else:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
color=Color.fail,
)
)
@customchannel.command(
name="edit",
description="Змінити параметри особистого каналу",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@@ -102,6 +111,8 @@ class CustomChannels(commands.Cog):
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
@@ -111,6 +122,7 @@ class CustomChannels(commands.Cog):
)
)
return
await custom_channel.edit(name=name)
await custom_channel.set_permissions(
ctx.user.guild.default_role,
@@ -119,6 +131,7 @@ class CustomChannels(commands.Cog):
create_public_threads=threads,
create_private_threads=threads,
)
await ctx.respond(
embed=Embed(
title="Канал змінено",
@@ -130,7 +143,7 @@ class CustomChannels(commands.Cog):
@customchannel.command(
name="remove",
description="Відібрати канал, знищуючи його, та частково повернути кошти",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option("confirm", description="Підтвердження операції")
async def customchannel_remove_cmd(
@@ -138,11 +151,25 @@ class CustomChannels(commands.Cog):
):
holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel is not None:
# Return if the user does not have a custom channel
if holo_user_ctx.customchannel is None:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас немає особистого каналу.",
color=Color.fail,
)
)
return
await ctx.defer()
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
@@ -153,6 +180,8 @@ class CustomChannels(commands.Cog):
)
holo_user_ctx.set("customchannel", None)
return
# Return if the confirmation is missing
if not confirm:
await ctx.respond(
embed=Embed(
@@ -162,8 +191,11 @@ class CustomChannels(commands.Cog):
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
@@ -171,15 +203,6 @@ class CustomChannels(commands.Cog):
color=Color.default,
)
)
else:
await ctx.defer(ephemeral=True)
await ctx.respond(
embed=Embed(
title="Помилка виконання",
description="У вас немає особистого каналу.",
color=Color.fail,
)
)
def setup(client: PycordBot):

View File

@@ -7,13 +7,15 @@ from discord import ApplicationContext, Embed, File, option
from discord import utils as ds_utils
from discord.commands import SlashCommandGroup
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from libbot.sync import json_write as sync_json_write
from classes.holo_user import HoloUser
from enums.colors import Color
from modules.database import col_users
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
from modules.utils_sync import guild_name
logger = logging.getLogger(__name__)
@@ -27,7 +29,7 @@ class Data(commands.Cog):
@data.command(
name="export",
description="Експортувати дані",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
@@ -35,13 +37,47 @@ class Data(commands.Cog):
async def data_export_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or (
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await holo_user.is_council(ctx.author)
):
logging.info(
"Moderator %s exported current users list", guild_name(ctx.user)
"User %s tried to use /export but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
return
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
makedirs("tmp", exist_ok=True)
uuid = str(uuid4())
if kind == "Користувачі":
@@ -57,16 +93,32 @@ class Data(commands.Cog):
}
)
json_write_sync(users, str(Path(f"tmp/{uuid}")))
sync_json_write(users, Path(f"tmp/{uuid}"))
await ctx.respond(
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
@data.command(
name="migrate",
description="Мігрувати всіх користувачів до бази",
guild_ids=[sync_config_get("guild")],
)
else:
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer()
holo_user = HoloUser(ctx.author)
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await holo_user.is_council(ctx.author)
):
logging.info(
"User %s tried to use /export but permission denied",
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
@@ -74,6 +126,7 @@ class Data(commands.Cog):
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
@@ -81,6 +134,7 @@ class Data(commands.Cog):
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
@@ -90,20 +144,8 @@ class Data(commands.Cog):
),
)
@data.command(
name="migrate",
description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get_sync("guild")],
)
@option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
)
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer()
holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or (
await holo_user.is_council(ctx.author)
):
return
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
@@ -113,6 +155,7 @@ class Data(commands.Cog):
for member in ctx.guild.members:
if member.bot:
continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
@@ -123,6 +166,7 @@ class Data(commands.Cog):
user[key] = defaults[key]
col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
@@ -134,33 +178,6 @@ class Data(commands.Cog):
color=Color.success,
)
)
else:
logging.info(
"User %s tried to use /migrate but permission denied",
guild_name(ctx.user),
)
await ctx.respond(
embed=Embed(
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів",
color=Color.fail,
)
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
def setup(client: PycordBot):

View File

@@ -1,12 +1,13 @@
import logging
from WaifuPicsPython import WaifuAsync
from discord import ApplicationContext, Embed, User, option, slash_command
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from WaifuPicsPython import WaifuAsync
from libbot.sync import config_get as sync_config_get
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
from modules.utils_sync import guild_name
logger = logging.getLogger(__name__)
@@ -20,12 +21,12 @@ class Fun(commands.Cog):
@slash_command(
name="action",
description="Провести над користувачем РП дію",
guild_ids=[config_get_sync("guild")],
guild_ids=[sync_config_get("guild")],
)
@option(
"type",
description="Тип дії, яку хочете провести з користувачем",
choices=config_get_sync("actions").keys(),
choices=sync_config_get("actions").keys(),
)
@option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User):

View File

@@ -1,10 +1,10 @@
from discord import Member, Message
from discord import utils as ds_utils
from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot
from modules.database import col_users
from modules.utils import config_get
class Logger(commands.Cog):
@@ -15,8 +15,8 @@ class Logger(commands.Cog):
async def on_message(self, message: Message):
if (
(message.author != self.client.user)
and (message.author.bot == False)
and (message.author.system == False)
and (message.author.bot is False)
and (message.author.system is False)
):
if col_users.find_one({"user": message.author.id}) is None:
user = {}
@@ -42,8 +42,8 @@ class Logger(commands.Cog):
if (
(member != self.client.user)
and (member.bot == False)
and (member.system == False)
and (member.bot is False)
and (member.system is False)
):
await welcome_chan.send(
content=(await config_get("welcome", "messages")).format(

View File

@@ -1,11 +1,11 @@
import logging
from discord import Activity, ActivityType
from libbot import config_get
from libbot.sync import config_get as sync_config_get
from modules.client import client
from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync
logging.basicConfig(
level=logging.INFO,
@@ -67,7 +67,7 @@ def main():
try:
scheduler.start()
client.run(config_get_sync("bot_token", "bot"))
client.run(sync_config_get("bot_token", "bot"))
except KeyboardInterrupt:
scheduler.shutdown()
exit()

View File

@@ -1,38 +0,0 @@
from typing import Any
import aiofiles
from ujson import dumps, loads
async def json_read(path: str) -> Any:
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: str) -> None:
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
)
async def config_get(key: str, *path: str) -> Any:
this_key = await json_read("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(key: str, value: Any, *path: str) -> None:
this_dict = await json_read("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
await json_write(this_dict, "config.json")
return

View File

@@ -1,45 +1,10 @@
from typing import Any, Union
from typing import Union
from discord import Member, User
from ujson import dumps, loads
def json_read_sync(path: str) -> Any:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Union[Member, User]) -> str:
if isinstance(member, User):
return member.name
if member.nick == None:
return member.name
else:
return member.nick
return member.name if member.nick is None else member.nick