2 Commits

Author SHA1 Message Date
kku
41112018da Working on #13 2024-12-15 23:36:48 +01:00
kku
982d0bce43 General improvements and refactoring 2024-12-15 23:21:41 +01:00
10 changed files with 289 additions and 302 deletions

View File

@@ -3,25 +3,20 @@ from typing import Any, Union
import discord import discord
import discord.member import discord.member
from libbot import config_get
from modules.database import col_users, col_warnings from modules.database import col_users, col_warnings
from modules.utils import config_get
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception): class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database""" """HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id): def __init__(self, user, user_id):
self.user = user self.user = user
self.user_id = user_id self.user_id = user_id
super().__init__( super().__init__(
f"User of type {type(self.user)} with id {self.user_id} was not found" f"User of type {type(self.user)} with id {self.user_id} was not found"
) )
@@ -63,10 +58,8 @@ class HoloUser:
* `int`: Number of warnings * `int`: Number of warnings
""" """
warns = col_warnings.find_one({"user": self.id}) warns = col_warnings.find_one({"user": self.id})
if warns == None:
return 0 return 0 if warns is None else warns["warns"]
else:
return warns["warns"]
def warn(self, count=1, reason: str = "Not provided") -> None: def warn(self, count=1, reason: str = "Not provided") -> None:
"""Warn and add count to warns number """Warn and add count to warns number
@@ -75,13 +68,15 @@ class HoloUser:
* `count` (int, optional): Count of warnings to be added. Defaults to 1. * `count` (int, optional): Count of warnings to be added. Defaults to 1.
""" """
warns = col_warnings.find_one({"user": self.id}) warns = col_warnings.find_one({"user": self.id})
if warns != None:
if warns is not None:
col_warnings.update_one( col_warnings.update_one(
filter={"_id": self.db_id}, filter={"_id": self.db_id},
update={"$set": {"warns": warns["warns"] + count}}, update={"$set": {"warns": warns["warns"] + count}},
) )
else: else:
col_warnings.insert_one(document={"user": self.id, "warns": count}) col_warnings.insert_one(document={"user": self.id, "warns": count})
logger.info(f"User {self.id} was warned {count} times due to: {reason}") logger.info(f"User {self.id} was warned {count} times due to: {reason}")
def set(self, key: str, value: Any) -> None: def set(self, key: str, value: Any) -> None:
@@ -93,10 +88,12 @@ class HoloUser:
""" """
if not hasattr(self, key): if not hasattr(self, key):
raise AttributeError() raise AttributeError()
setattr(self, key, value) setattr(self, key, value)
col_users.update_one( col_users.update_one(
filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
) )
logger.info(f"Set attribute {key} of user {self.id} to {value}") logger.info(f"Set attribute {key} of user {self.id} to {value}")
async def is_moderator( async def is_moderator(
@@ -112,11 +109,14 @@ class HoloUser:
""" """
if isinstance(member, discord.User): if isinstance(member, discord.User):
return False return False
moderator_role = await config_get("moderators", "roles") moderator_role = await config_get("moderators", "roles")
council_role = await config_get("council", "roles") council_role = await config_get("council", "roles")
for role in member.roles: for role in member.roles:
if role.id == moderator_role or role.id == council_role: if role.id == moderator_role or role.id == council_role:
return True return True
return False return False
async def is_council( async def is_council(
@@ -132,10 +132,13 @@ class HoloUser:
""" """
if isinstance(member, discord.User): if isinstance(member, discord.User):
return False return False
council_role = await config_get("council", "roles") council_role = await config_get("council", "roles")
for role in member.roles: for role in member.roles:
if role.id == council_role: if role.id == council_role:
return True return True
return False return False
# def purge(self) -> None: # def purge(self) -> None:

View File

@@ -1,22 +1,27 @@
import logging import logging
import sys
from discord import ApplicationContext, Embed, User, option, slash_command from discord import ApplicationContext, Embed, User, option, slash_command
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from enums.colors import Color from enums.colors import Color
from modules.scheduled import scheduler from modules.scheduled import scheduler
from modules.utils import config_get from modules.utils_sync import guild_name
from modules.utils_sync import config_get_sync, guild_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Admin(commands.Cog): class Admin(commands.Cog):
"""Cog with utility commands for admins."""
def __init__(self, client: PycordBot): def __init__(self, client: PycordBot):
self.client = client self.client = client
# Disabled because warning functionality is temporarily not needed
# @slash_command( # @slash_command(
# name="warning", # name="warning",
# description="Попередити юзера про порушення правил", # description="Попередити юзера про порушення правил",
@@ -102,7 +107,7 @@ class Admin(commands.Cog):
@slash_command( @slash_command(
name="clear", name="clear",
description="Видалити деяку кількість повідомлень в каналі", description="Видалити деяку кількість повідомлень в каналі",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option("amount", description="Кількість") @option("amount", description="Кількість")
@option("user", description="Користувач", default=None) @option("user", description="Користувач", default=None)
@@ -119,50 +124,58 @@ class Admin(commands.Cog):
amount, amount,
ctx.channel.id, ctx.channel.id,
) )
await ctx.respond( await ctx.respond(
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0 embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
) )
if user == None:
if user is None:
await ctx.channel.purge(limit=amount) await ctx.channel.purge(limit=amount)
else: else:
await ctx.channel.purge( await ctx.channel.purge(
limit=amount, check=lambda msg: msg.author == user limit=amount, check=lambda msg: msg.author == user
) )
else:
logging.warning( return
"User %s tried to use /clear but permission denied",
guild_name(ctx.user), logging.warning(
) "User %s tried to use /clear but permission denied",
await ctx.respond( guild_name(ctx.user),
embed=Embed( )
title="Відмовлено в доступі",
description="Здається, це команда лише для модераторів", await ctx.respond(
color=Color.fail, embed=Embed(
) title="Відмовлено в доступі",
) description="Здається, це команда лише для модераторів",
mod_role = ds_utils.get( color=Color.fail,
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
) )
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
@slash_command( @slash_command(
name="reboot", name="reboot",
description="Перезапустити бота", description="Перезапустити бота",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
async def reboot_cmd(self, ctx: ApplicationContext): async def reboot_cmd(self, ctx: ApplicationContext):
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
if ctx.user.id in self.client.owner_ids: if ctx.user.id in self.client.owner_ids:
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user)) logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
await ctx.respond( await ctx.respond(
@@ -173,34 +186,37 @@ class Admin(commands.Cog):
) )
scheduler.shutdown() scheduler.shutdown()
await self.client.close() await self.client.close()
exit() sys.exit()
else:
logging.warning( logging.warning(
"User %s tried to use /reboot but permission denied", "User %s tried to use /reboot but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond(
embed=Embed( await ctx.respond(
title="Відмовлено в доступі", embed=Embed(
description="Здається, це команда лише для модераторів", title="Відмовлено в доступі",
color=Color.fail, description="Здається, це команда лише для модераторів",
) color=Color.fail,
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
) )
)
mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles")
)
admin_chan = ds_utils.get(
ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"),
)
await admin_chan.send(
content=f"{mod_role.mention}",
embed=Embed(
title="Неавторизований запит",
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
color=Color.fail,
),
)
def setup(client: PycordBot): def setup(client: PycordBot):

View File

@@ -17,8 +17,8 @@ class Analytics(commands.Cog):
async def on_message(self, message: Message): async def on_message(self, message: Message):
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot == False) and (message.author.bot is False)
and (message.author.system == False) and (message.author.system is False)
): ):
stickers = [] stickers = []
for sticker in message.stickers: for sticker in message.stickers:

View File

@@ -3,13 +3,14 @@ from discord import utils as ds_utils
from discord.abc import GuildChannel from discord.abc import GuildChannel
from discord.commands import SlashCommandGroup from discord.commands import SlashCommandGroup
from discord.ext import commands from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from classes.holo_user import HoloUser from classes.holo_user import HoloUser
from enums.colors import Color from enums.colors import Color
from modules.database import col_users from modules.database import col_users
from modules.utils import config_get from modules.utils_sync import guild_name
from modules.utils_sync import config_get_sync, guild_name
class CustomChannels(commands.Cog): class CustomChannels(commands.Cog):
@@ -27,7 +28,7 @@ class CustomChannels(commands.Cog):
@customchannel.command( @customchannel.command(
name="get", name="get",
description="Отримати персональний текстовий канал", description="Отримати персональний текстовий канал",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option("name", description="Назва каналу") @option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції") @option("reactions", description="Дозволити реакції")
@@ -37,46 +38,8 @@ class CustomChannels(commands.Cog):
): ):
holo_user_ctx = HoloUser(ctx.user) holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel == None: # Return if the user already has a custom channel
await ctx.defer() if holo_user_ctx.customchannel is not None:
created_channel = await ctx.user.guild.create_text_channel(
name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get(
ctx.author.guild.categories,
id=await config_get("customchannels", "categories"),
),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
add_reactions=reactions,
create_public_threads=threads,
create_private_threads=threads,
)
await created_channel.set_permissions(
ctx.user,
attach_files=True,
manage_messages=True,
send_messages=True,
embed_links=True,
manage_channels=True,
)
holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond(
embed=Embed(
title="Створено канал",
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.success,
)
)
bots = await config_get("bots")
for bot in bots:
await created_channel.set_permissions(
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
else:
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
@@ -85,11 +48,57 @@ class CustomChannels(commands.Cog):
color=Color.fail, color=Color.fail,
) )
) )
return
await ctx.defer()
created_channel = await ctx.user.guild.create_text_channel(
name=name,
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
category=ds_utils.get(
ctx.author.guild.categories,
id=await config_get("customchannels", "categories"),
),
)
await created_channel.set_permissions(
ctx.user.guild.default_role,
send_messages=False,
add_reactions=reactions,
create_public_threads=threads,
create_private_threads=threads,
)
await created_channel.set_permissions(
ctx.user,
attach_files=True,
manage_messages=True,
send_messages=True,
embed_links=True,
manage_channels=True,
)
holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond(
embed=Embed(
title="Створено канал",
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
color=Color.success,
)
)
bots = await config_get("bots")
for bot in bots:
await created_channel.set_permissions(
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
view_channel=False,
)
@customchannel.command( @customchannel.command(
name="edit", name="edit",
description="Змінити параметри особистого каналу", description="Змінити параметри особистого каналу",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option("name", description="Назва каналу") @option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції") @option("reactions", description="Дозволити реакції")
@@ -102,6 +111,8 @@ class CustomChannels(commands.Cog):
custom_channel = ds_utils.get( custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel ctx.guild.channels, id=holo_user_ctx.customchannel
) )
# Return if the channel was not found
if custom_channel is None: if custom_channel is None:
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
@@ -111,6 +122,7 @@ class CustomChannels(commands.Cog):
) )
) )
return return
await custom_channel.edit(name=name) await custom_channel.edit(name=name)
await custom_channel.set_permissions( await custom_channel.set_permissions(
ctx.user.guild.default_role, ctx.user.guild.default_role,
@@ -119,6 +131,7 @@ class CustomChannels(commands.Cog):
create_public_threads=threads, create_public_threads=threads,
create_private_threads=threads, create_private_threads=threads,
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Канал змінено", title="Канал змінено",
@@ -130,7 +143,7 @@ class CustomChannels(commands.Cog):
@customchannel.command( @customchannel.command(
name="remove", name="remove",
description="Відібрати канал, знищуючи його, та частково повернути кошти", description="Відібрати канал, знищуючи його, та частково повернути кошти",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option("confirm", description="Підтвердження операції") @option("confirm", description="Підтвердження операції")
async def customchannel_remove_cmd( async def customchannel_remove_cmd(
@@ -138,40 +151,8 @@ class CustomChannels(commands.Cog):
): ):
holo_user_ctx = HoloUser(ctx.user) holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel is not None: # Return if the user does not have a custom channel
await ctx.defer() if holo_user_ctx.customchannel is None:
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail,
)
)
holo_user_ctx.set("customchannel", None)
return
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.fail,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.default,
)
)
else:
await ctx.defer(ephemeral=True) await ctx.defer(ephemeral=True)
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
@@ -180,6 +161,48 @@ class CustomChannels(commands.Cog):
color=Color.fail, color=Color.fail,
) )
) )
return
await ctx.defer()
custom_channel = ds_utils.get(
ctx.guild.channels, id=holo_user_ctx.customchannel
)
# Return if the channel was not found
if custom_channel is None:
await ctx.respond(
embed=Embed(
title="Канал не знайдено",
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
color=Color.fail,
)
)
holo_user_ctx.set("customchannel", None)
return
# Return if the confirmation is missing
if not confirm:
await ctx.respond(
embed=Embed(
title="Підтвердження не надано",
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
color=Color.fail,
)
)
return
await custom_channel.delete(reason="Власник запросив видалення")
holo_user_ctx.set("customchannel", None)
await ctx.respond(
embed=Embed(
title="Канал знищено",
description="Ви відмовились від каналу та видалили його.",
color=Color.default,
)
)
def setup(client: PycordBot): def setup(client: PycordBot):

View File

@@ -7,13 +7,15 @@ from discord import ApplicationContext, Embed, File, option
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.commands import SlashCommandGroup from discord.commands import SlashCommandGroup
from discord.ext import commands from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot from libbot.pycord.classes import PycordBot
from libbot.sync import config_get as sync_config_get
from libbot.sync import json_write as sync_json_write
from classes.holo_user import HoloUser from classes.holo_user import HoloUser
from enums.colors import Color from enums.colors import Color
from modules.database import col_users from modules.database import col_users
from modules.utils import config_get from modules.utils_sync import guild_name
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -27,7 +29,7 @@ class Data(commands.Cog):
@data.command( @data.command(
name="export", name="export",
description="Експортувати дані", description="Експортувати дані",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option( @option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
@@ -35,38 +37,16 @@ class Data(commands.Cog):
async def data_export_cmd(self, ctx: ApplicationContext, kind: str): async def data_export_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer() await ctx.defer()
holo_user = HoloUser(ctx.author) holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or (
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await holo_user.is_council(ctx.author) await holo_user.is_council(ctx.author)
): ):
logging.info(
"Moderator %s exported current users list", guild_name(ctx.user)
)
makedirs("tmp", exist_ok=True)
uuid = str(uuid4())
if kind == "Користувачі":
users = []
for member in ctx.guild.members:
users.append(
{
"id": member.id,
"nick": member.nick,
"username": f"{member.name}#{member.discriminator}",
"bot": member.bot,
}
)
json_write_sync(users, str(Path(f"tmp/{uuid}")))
await ctx.respond(
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
)
else:
logging.info( logging.info(
"User %s tried to use /export but permission denied", "User %s tried to use /export but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Відмовлено в доступі", title="Відмовлено в доступі",
@@ -74,6 +54,7 @@ class Data(commands.Cog):
color=Color.fail, color=Color.fail,
) )
) )
mod_role = ds_utils.get( mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
@@ -81,6 +62,7 @@ class Data(commands.Cog):
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
await admin_chan.send( await admin_chan.send(
content=f"{mod_role.mention}", content=f"{mod_role.mention}",
embed=Embed( embed=Embed(
@@ -90,55 +72,53 @@ class Data(commands.Cog):
), ),
) )
return
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
makedirs("tmp", exist_ok=True)
uuid = str(uuid4())
if kind == "Користувачі":
users = []
for member in ctx.guild.members:
users.append(
{
"id": member.id,
"nick": member.nick,
"username": f"{member.name}#{member.discriminator}",
"bot": member.bot,
}
)
sync_json_write(users, Path(f"tmp/{uuid}"))
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
@data.command( @data.command(
name="migrate", name="migrate",
description="Мігрувати всіх користувачів до бази", description="Мігрувати всіх користувачів до бази",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option( @option(
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"] "kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
) )
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str): async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
await ctx.defer() await ctx.defer()
holo_user = HoloUser(ctx.author) holo_user = HoloUser(ctx.author)
if (ctx.user.id in self.client.owner_ids) or (
# Return if the user is not an owner and not in the council
if (ctx.user.id not in self.client.owner_ids) and not (
await holo_user.is_council(ctx.author) await holo_user.is_council(ctx.author)
): ):
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
)
if kind == "Користувачі":
for member in ctx.guild.members:
if member.bot:
continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
await ctx.respond(
embed=Embed(
title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.success,
)
)
else:
logging.info( logging.info(
"User %s tried to use /migrate but permission denied", "User %s tried to use /migrate but permission denied",
guild_name(ctx.user), guild_name(ctx.user),
) )
await ctx.respond( await ctx.respond(
embed=Embed( embed=Embed(
title="Відмовлено в доступі", title="Відмовлено в доступі",
@@ -146,6 +126,7 @@ class Data(commands.Cog):
color=Color.fail, color=Color.fail,
) )
) )
mod_role = ds_utils.get( mod_role = ds_utils.get(
ctx.user.guild.roles, id=await config_get("moderators", "roles") ctx.user.guild.roles, id=await config_get("moderators", "roles")
) )
@@ -153,6 +134,7 @@ class Data(commands.Cog):
ctx.user.guild.channels, ctx.user.guild.channels,
id=await config_get("adminchat", "channels", "text"), id=await config_get("adminchat", "channels", "text"),
) )
await admin_chan.send( await admin_chan.send(
content=f"{mod_role.mention}", content=f"{mod_role.mention}",
embed=Embed( embed=Embed(
@@ -162,6 +144,41 @@ class Data(commands.Cog):
), ),
) )
return
logging.info(
"Moderator %s started migration of all members to the database",
guild_name(ctx.user),
)
if kind == "Користувачі":
for member in ctx.guild.members:
if member.bot:
continue
if col_users.find_one({"user": member.id}) is None:
user = {}
defaults = await config_get("user", "defaults")
user["user"] = member.id
for key in defaults:
user[key] = defaults[key]
col_users.insert_one(document=user)
logging.info(
"Added DB record for user %s during migration", member.id
)
await ctx.respond(
embed=Embed(
title="Міграцію завершено",
description="Всім користувачам сервера було створено записи в базі даних.",
color=Color.success,
)
)
def setup(client: PycordBot): def setup(client: PycordBot):
client.add_cog(Data(client)) client.add_cog(Data(client))

View File

@@ -1,12 +1,13 @@
import logging import logging
from WaifuPicsPython import WaifuAsync
from discord import ApplicationContext, Embed, User, option, slash_command from discord import ApplicationContext, Embed, User, option, slash_command
from discord.ext import commands from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot from libbot.pycord.classes import PycordBot
from WaifuPicsPython import WaifuAsync from libbot.sync import config_get as sync_config_get
from modules.utils import config_get from modules.utils_sync import guild_name
from modules.utils_sync import config_get_sync, guild_name
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -20,12 +21,12 @@ class Fun(commands.Cog):
@slash_command( @slash_command(
name="action", name="action",
description="Провести над користувачем РП дію", description="Провести над користувачем РП дію",
guild_ids=[config_get_sync("guild")], guild_ids=[sync_config_get("guild")],
) )
@option( @option(
"type", "type",
description="Тип дії, яку хочете провести з користувачем", description="Тип дії, яку хочете провести з користувачем",
choices=config_get_sync("actions").keys(), choices=sync_config_get("actions").keys(),
) )
@option("user", description="Користувач") @option("user", description="Користувач")
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User): async def action_cmd(self, ctx: ApplicationContext, type: str, user: User):

View File

@@ -1,10 +1,10 @@
from discord import Member, Message from discord import Member, Message
from discord import utils as ds_utils from discord import utils as ds_utils
from discord.ext import commands from discord.ext import commands
from libbot import config_get
from libbot.pycord.classes import PycordBot from libbot.pycord.classes import PycordBot
from modules.database import col_users from modules.database import col_users
from modules.utils import config_get
class Logger(commands.Cog): class Logger(commands.Cog):
@@ -15,8 +15,8 @@ class Logger(commands.Cog):
async def on_message(self, message: Message): async def on_message(self, message: Message):
if ( if (
(message.author != self.client.user) (message.author != self.client.user)
and (message.author.bot == False) and (message.author.bot is False)
and (message.author.system == False) and (message.author.system is False)
): ):
if col_users.find_one({"user": message.author.id}) is None: if col_users.find_one({"user": message.author.id}) is None:
user = {} user = {}
@@ -42,8 +42,8 @@ class Logger(commands.Cog):
if ( if (
(member != self.client.user) (member != self.client.user)
and (member.bot == False) and (member.bot is False)
and (member.system == False) and (member.system is False)
): ):
await welcome_chan.send( await welcome_chan.send(
content=(await config_get("welcome", "messages")).format( content=(await config_get("welcome", "messages")).format(

View File

@@ -1,11 +1,11 @@
import logging import logging
from discord import Activity, ActivityType from discord import Activity, ActivityType
from libbot import config_get
from libbot.sync import config_get as sync_config_get
from modules.client import client from modules.client import client
from modules.scheduled import scheduler from modules.scheduled import scheduler
from modules.utils import config_get
from modules.utils_sync import config_get_sync
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@@ -67,7 +67,7 @@ def main():
try: try:
scheduler.start() scheduler.start()
client.run(config_get_sync("bot_token", "bot")) client.run(sync_config_get("bot_token", "bot"))
except KeyboardInterrupt: except KeyboardInterrupt:
scheduler.shutdown() scheduler.shutdown()
exit() exit()

View File

@@ -1,38 +0,0 @@
from typing import Any
import aiofiles
from ujson import dumps, loads
async def json_read(path: str) -> Any:
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
data = await f.read()
return loads(data)
async def json_write(data: Any, path: str) -> None:
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
await f.write(
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
)
async def config_get(key: str, *path: str) -> Any:
this_key = await json_read("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
async def config_set(key: str, value: Any, *path: str) -> None:
this_dict = await json_read("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
await json_write(this_dict, "config.json")
return

View File

@@ -1,45 +1,10 @@
from typing import Any, Union from typing import Union
from discord import Member, User from discord import Member, User
from ujson import dumps, loads
def json_read_sync(path: str) -> Any:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Union[Member, User]) -> str: def guild_name(member: Union[Member, User]) -> str:
if isinstance(member, User): if isinstance(member, User):
return member.name return member.name
if member.nick == None:
return member.name return member.name if member.nick is None else member.nick
else:
return member.nick