v0.1.0-rc.3 #15
38
README.md
38
README.md
@ -1,2 +1,38 @@
|
||||
# HoloBotDiscord
|
||||
<h1 align="center">HoloBot Discord</h1>
|
||||
|
||||
<p align="center">Small Discord bot made on Py-Cord</p>
|
||||
|
||||
<p align="center">
|
||||
<a href="https://git.end-play.xyz/HoloUA/Discord/src/branch/master/LICENSE"><img alt="License: GPL" src="https://img.shields.io/badge/License-GPL-blue"></a>
|
||||
<a href="https://git.end-play.xyz/HoloUA/Discord"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
|
||||
</p>
|
||||
|
||||
## Installation
|
||||
|
||||
1. Install MongoDB using the [official installation manual](https://www.mongodb.com/docs/manual/installation/).
|
||||
2. `git clone https://git.end-play.xyz/HoloUA/Discord.git`
|
||||
3. `cd Discord`
|
||||
4. Install Python 3.9+ (at least 3.11 is recommended) for your OS
|
||||
5. `python3 -m pip install -r requirements.txt`
|
||||
6. Run it with `python3 main.py` after configuring
|
||||
|
||||
## Configuration
|
||||
|
||||
There's a file `config_example.json` which contains default configuration
|
||||
and should be used as a base config.
|
||||
|
||||
Copy this file to `config.json` and open it with any text editor of your liking.
|
||||
|
||||
Modify the newly created configuration file to fit your needs.
|
||||
|
||||
Mandatory keys to modify:
|
||||
|
||||
- guild
|
||||
- bot.owner
|
||||
- bot.bot_token
|
||||
- database.*
|
||||
- categories.*
|
||||
- channels.*
|
||||
- roles.*
|
||||
|
||||
After all of that you're good to go! Happy using :)
|
@ -1,60 +1,41 @@
|
||||
import logging
|
||||
from typing import Any, Union
|
||||
from typing import Any, Union, Dict
|
||||
|
||||
import discord
|
||||
import discord.member
|
||||
from bson import ObjectId
|
||||
from discord import User, Member
|
||||
from libbot import config_get
|
||||
|
||||
from modules.database import col_users, col_warnings
|
||||
from modules.utils import config_get
|
||||
from errors import UserNotFoundError
|
||||
from modules.database import col_warnings, sync_col_users, sync_col_warnings, col_users
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotEnoughMoneyError(Exception):
|
||||
"""User does not have enough money to do that"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class UserNotFoundError(Exception):
|
||||
"""HoloUser could not find user with such an ID in database"""
|
||||
|
||||
def __init__(self, user, user_id):
|
||||
self.user = user
|
||||
self.user_id = user_id
|
||||
super().__init__(
|
||||
f"User of type {type(self.user)} with id {self.user_id} was not found"
|
||||
)
|
||||
|
||||
|
||||
class HoloUser:
|
||||
def __init__(
|
||||
self, user: Union[discord.User, discord.Member, discord.member.Member, int]
|
||||
) -> None:
|
||||
def __init__(self, user: Union[User, Member, int]) -> None:
|
||||
"""Get an object that has a proper binding between Discord ID and database
|
||||
|
||||
### Args:
|
||||
* `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted
|
||||
* `user` (Union[User, Member, int]): Object from which ID can be extracted
|
||||
|
||||
### Raises:
|
||||
* `UserNotFoundError`: User with such ID does not seem to exist in database
|
||||
"""
|
||||
|
||||
if hasattr(user, "id"):
|
||||
self.id = user.id # type: ignore
|
||||
else:
|
||||
self.id = user
|
||||
self.id: int = user if not hasattr(user, "id") else user.id
|
||||
|
||||
jav_user = col_users.find_one({"user": self.id})
|
||||
jav_user: Union[Dict[str, Any], None] = sync_col_users.find_one(
|
||||
{"user": self.id}
|
||||
)
|
||||
|
||||
if jav_user is None:
|
||||
raise UserNotFoundError(user=user, user_id=self.id)
|
||||
|
||||
self.db_id = jav_user["_id"]
|
||||
self.db_id: ObjectId = jav_user["_id"]
|
||||
|
||||
self.customrole = jav_user["customrole"]
|
||||
self.customchannel = jav_user["customchannel"]
|
||||
self.warnings = self.warns()
|
||||
self.customrole: Union[int, None] = jav_user["customrole"]
|
||||
self.customchannel: Union[int, None] = jav_user["customchannel"]
|
||||
self.warnings: int = self.warns()
|
||||
|
||||
def warns(self) -> int:
|
||||
"""Get number of warnings user has
|
||||
@ -62,29 +43,33 @@ class HoloUser:
|
||||
### Returns:
|
||||
* `int`: Number of warnings
|
||||
"""
|
||||
warns = col_warnings.find_one({"user": self.id})
|
||||
if warns == None:
|
||||
return 0
|
||||
else:
|
||||
return warns["warns"]
|
||||
warns: Union[Dict[str, Any], None] = sync_col_warnings.find_one(
|
||||
{"user": self.id}
|
||||
)
|
||||
|
||||
def warn(self, count=1, reason: str = "Not provided") -> None:
|
||||
return 0 if warns is None else warns["warns"]
|
||||
|
||||
async def warn(self, count=1, reason: str = "Not provided") -> None:
|
||||
"""Warn and add count to warns number
|
||||
|
||||
### Args:
|
||||
* `count` (int, optional): Count of warnings to be added. Defaults to 1.
|
||||
"""
|
||||
warns = col_warnings.find_one({"user": self.id})
|
||||
if warns != None:
|
||||
col_warnings.update_one(
|
||||
filter={"_id": self.db_id},
|
||||
update={"$set": {"warns": warns["warns"] + count}},
|
||||
warns: Union[Dict[str, Any], None] = await col_warnings.find_one(
|
||||
{"user": self.id}
|
||||
)
|
||||
|
||||
if warns is not None:
|
||||
await col_warnings.update_one(
|
||||
{"_id": self.db_id},
|
||||
{"$set": {"warns": warns["warns"] + count}},
|
||||
)
|
||||
else:
|
||||
col_warnings.insert_one(document={"user": self.id, "warns": count})
|
||||
logger.info(f"User {self.id} was warned {count} times due to: {reason}")
|
||||
await col_warnings.insert_one(document={"user": self.id, "warns": count})
|
||||
|
||||
def set(self, key: str, value: Any) -> None:
|
||||
logger.info("User %s was warned %s times due to: %s", self.id, count, reason)
|
||||
|
||||
async def set(self, key: str, value: Any) -> None:
|
||||
"""Set attribute data and save it into database
|
||||
|
||||
### Args:
|
||||
@ -93,49 +78,56 @@ class HoloUser:
|
||||
"""
|
||||
if not hasattr(self, key):
|
||||
raise AttributeError()
|
||||
setattr(self, key, value)
|
||||
col_users.update_one(
|
||||
filter={"_id": self.db_id}, update={"$set": {key: value}}, upsert=True
|
||||
)
|
||||
logger.info(f"Set attribute {key} of user {self.id} to {value}")
|
||||
|
||||
async def is_moderator(
|
||||
self, member: Union[discord.User, discord.Member, discord.member.Member]
|
||||
) -> bool:
|
||||
setattr(self, key, value)
|
||||
|
||||
await col_users.update_one(
|
||||
{"_id": self.db_id}, {"$set": {key: value}}, upsert=True
|
||||
)
|
||||
|
||||
logger.info("Set attribute %s of user %s to %s", key, self.id, value)
|
||||
|
||||
@staticmethod
|
||||
async def is_moderator(member: Union[User, Member]) -> bool:
|
||||
"""Check if user is moderator or council member
|
||||
|
||||
### Args:
|
||||
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
|
||||
* `member` (Union[User, Member]): Member object
|
||||
|
||||
### Returns:
|
||||
`bool`: `True` if member is a moderator or member of council and `False` if not
|
||||
"""
|
||||
if isinstance(member, discord.User):
|
||||
if isinstance(member, User):
|
||||
return False
|
||||
moderator_role = await config_get("moderators", "roles")
|
||||
council_role = await config_get("council", "roles")
|
||||
|
||||
moderator_role: Union[int, None] = await config_get("moderators", "roles")
|
||||
council_role: Union[int, None] = await config_get("council", "roles")
|
||||
|
||||
for role in member.roles:
|
||||
if role.id == moderator_role or role.id == council_role:
|
||||
if role.id in (moderator_role, council_role):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
async def is_council(
|
||||
self, member: Union[discord.User, discord.Member, discord.member.Member]
|
||||
) -> bool:
|
||||
@staticmethod
|
||||
async def is_council(member: Union[User, Member]) -> bool:
|
||||
"""Check if user is a member of council
|
||||
|
||||
### Args:
|
||||
* `member` (Union[discord.User, discord.Member, discord.member.Member]): Member object
|
||||
* `member` (Union[User, Member]): Member object
|
||||
|
||||
### Returns:
|
||||
`bool`: `True` if member is a member of council and `False` if not
|
||||
"""
|
||||
if isinstance(member, discord.User):
|
||||
if isinstance(member, User):
|
||||
return False
|
||||
|
||||
council_role = await config_get("council", "roles")
|
||||
|
||||
for role in member.roles:
|
||||
if role.id == council_role:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
# def purge(self) -> None:
|
||||
|
141
cogs/admin.py
141
cogs/admin.py
@ -1,22 +1,37 @@
|
||||
import logging
|
||||
import sys
|
||||
from typing import Union
|
||||
|
||||
from discord import ApplicationContext, Embed, User, option, slash_command
|
||||
from discord import (
|
||||
ApplicationContext,
|
||||
Embed,
|
||||
User,
|
||||
option,
|
||||
slash_command,
|
||||
Role,
|
||||
TextChannel,
|
||||
)
|
||||
from discord import utils as ds_utils
|
||||
from discord.ext import commands
|
||||
from libbot import config_get
|
||||
from libbot.pycord.classes import PycordBot
|
||||
from libbot.sync import config_get as sync_config_get
|
||||
|
||||
from enums.colors import Color
|
||||
from modules.scheduled import scheduler
|
||||
from modules.utils import config_get
|
||||
from modules.utils_sync import config_get_sync, guild_name
|
||||
from enums import Color
|
||||
from modules.scheduler import scheduler
|
||||
from modules.utils_sync import guild_name
|
||||
from modules.waifu_pics import waifu_pics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Admin(commands.Cog):
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client = client
|
||||
"""Cog with utility commands for admins."""
|
||||
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client: PycordBot = client
|
||||
|
||||
# Disabled because warning functionality is temporarily not needed
|
||||
# @slash_command(
|
||||
# name="warning",
|
||||
# description="Попередити юзера про порушення правил",
|
||||
@ -102,7 +117,7 @@ class Admin(commands.Cog):
|
||||
@slash_command(
|
||||
name="clear",
|
||||
description="Видалити деяку кількість повідомлень в каналі",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option("amount", description="Кількість")
|
||||
@option("user", description="Користувач", default=None)
|
||||
@ -111,7 +126,7 @@ class Admin(commands.Cog):
|
||||
ctx: ApplicationContext,
|
||||
amount: int,
|
||||
user: User,
|
||||
):
|
||||
) -> None:
|
||||
if ctx.user.id in self.client.owner_ids:
|
||||
logging.info(
|
||||
"User %s removed %s message(s) in %s",
|
||||
@ -119,89 +134,107 @@ class Admin(commands.Cog):
|
||||
amount,
|
||||
ctx.channel.id,
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(description="Видаляю..."), ephemeral=True, delete_after=2.0
|
||||
)
|
||||
if user == None:
|
||||
|
||||
if user is None:
|
||||
await ctx.channel.purge(limit=amount)
|
||||
else:
|
||||
await ctx.channel.purge(
|
||||
limit=amount, check=lambda msg: msg.author == user
|
||||
)
|
||||
else:
|
||||
logging.warning(
|
||||
"User %s tried to use /clear but permission denied",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Відмовлено в доступі",
|
||||
description="Здається, це команда лише для модераторів",
|
||||
color=Color.fail,
|
||||
)
|
||||
)
|
||||
mod_role = ds_utils.get(
|
||||
ctx.user.guild.roles, id=await config_get("moderators", "roles")
|
||||
)
|
||||
admin_chan = ds_utils.get(
|
||||
ctx.user.guild.channels,
|
||||
id=await config_get("adminchat", "channels", "text"),
|
||||
|
||||
return
|
||||
|
||||
logging.warning(
|
||||
"User %s tried to use /clear but permission denied",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Відмовлено в доступі",
|
||||
description="Здається, це команда лише для модераторів",
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
|
||||
mod_role: Union[Role, None] = ds_utils.get(
|
||||
ctx.user.guild.roles, id=await config_get("moderators", "roles")
|
||||
)
|
||||
admin_chan: Union[TextChannel, None] = ds_utils.get(
|
||||
ctx.user.guild.channels,
|
||||
id=await config_get("adminchat", "channels", "text"),
|
||||
)
|
||||
|
||||
if admin_chan is not None:
|
||||
await admin_chan.send(
|
||||
content=f"{mod_role.mention}",
|
||||
content="" if mod_role is None else mod_role.mention,
|
||||
embed=Embed(
|
||||
title="Неавторизований запит",
|
||||
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
),
|
||||
)
|
||||
|
||||
@slash_command(
|
||||
name="reboot",
|
||||
description="Перезапустити бота",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
async def reboot_cmd(self, ctx: ApplicationContext):
|
||||
async def reboot_cmd(self, ctx: ApplicationContext) -> None:
|
||||
await ctx.defer(ephemeral=True)
|
||||
|
||||
if ctx.user.id in self.client.owner_ids:
|
||||
logging.info("Calling shutdown initiated by %s", guild_name(ctx.user))
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Вимикаюсь...",
|
||||
description="Спробую перезавантажитись за 5 секунд",
|
||||
)
|
||||
)
|
||||
|
||||
scheduler.shutdown()
|
||||
|
||||
await self.client.close()
|
||||
exit()
|
||||
else:
|
||||
logging.warning(
|
||||
"User %s tried to use /reboot but permission denied",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Відмовлено в доступі",
|
||||
description="Здається, це команда лише для модераторів",
|
||||
color=Color.fail,
|
||||
)
|
||||
)
|
||||
mod_role = ds_utils.get(
|
||||
ctx.user.guild.roles, id=await config_get("moderators", "roles")
|
||||
)
|
||||
admin_chan = ds_utils.get(
|
||||
ctx.user.guild.channels,
|
||||
id=await config_get("adminchat", "channels", "text"),
|
||||
await waifu_pics._client_session.close()
|
||||
|
||||
sys.exit()
|
||||
|
||||
logging.warning(
|
||||
"User %s tried to use /reboot but permission denied",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Відмовлено в доступі",
|
||||
description="Здається, це команда лише для модераторів",
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
|
||||
mod_role: Union[Role, None] = ds_utils.get(
|
||||
ctx.user.guild.roles, id=await config_get("moderators", "roles")
|
||||
)
|
||||
admin_chan: Union[TextChannel, None] = ds_utils.get(
|
||||
ctx.user.guild.channels,
|
||||
id=await config_get("adminchat", "channels", "text"),
|
||||
)
|
||||
|
||||
if admin_chan is not None:
|
||||
await admin_chan.send(
|
||||
content=f"{mod_role.mention}",
|
||||
content="" if mod_role is None else mod_role.mention,
|
||||
embed=Embed(
|
||||
title="Неавторизований запит",
|
||||
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def setup(client: PycordBot):
|
||||
def setup(client: PycordBot) -> None:
|
||||
client.add_cog(Admin(client))
|
||||
|
@ -1,4 +1,5 @@
|
||||
import logging
|
||||
from typing import Dict, List, Any
|
||||
|
||||
from discord import Cog, Message
|
||||
from discord.ext import commands
|
||||
@ -11,16 +12,17 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class Analytics(commands.Cog):
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client = client
|
||||
self.client: PycordBot = client
|
||||
|
||||
@Cog.listener()
|
||||
async def on_message(self, message: Message):
|
||||
async def on_message(self, message: Message) -> None:
|
||||
if (
|
||||
(message.author != self.client.user)
|
||||
and (message.author.bot == False)
|
||||
and (message.author.system == False)
|
||||
and (message.author.bot is False)
|
||||
and (message.author.system is False)
|
||||
):
|
||||
stickers = []
|
||||
stickers: List[Dict[str, Any]] = []
|
||||
|
||||
for sticker in message.stickers:
|
||||
stickers.append(
|
||||
{
|
||||
@ -31,7 +33,8 @@ class Analytics(commands.Cog):
|
||||
}
|
||||
)
|
||||
|
||||
attachments = []
|
||||
attachments: List[Dict[str, Any]] = []
|
||||
|
||||
for attachment in message.attachments:
|
||||
attachments.append(
|
||||
{
|
||||
@ -46,7 +49,7 @@ class Analytics(commands.Cog):
|
||||
}
|
||||
)
|
||||
|
||||
col_analytics.insert_one(
|
||||
await col_analytics.insert_one(
|
||||
{
|
||||
"user": message.author.id,
|
||||
"channel": message.channel.id,
|
||||
@ -57,5 +60,5 @@ class Analytics(commands.Cog):
|
||||
)
|
||||
|
||||
|
||||
def setup(client: PycordBot):
|
||||
def setup(client: PycordBot) -> None:
|
||||
client.add_cog(Analytics(client))
|
||||
|
@ -1,116 +1,152 @@
|
||||
from discord import ApplicationContext, Embed, option
|
||||
import logging
|
||||
from typing import Any, Dict, Union
|
||||
|
||||
from discord import ApplicationContext, Embed, option, TextChannel, Role
|
||||
from discord import utils as ds_utils
|
||||
from discord.abc import GuildChannel
|
||||
from discord.commands import SlashCommandGroup
|
||||
from discord.ext import commands
|
||||
from libbot import config_get
|
||||
from libbot.pycord.classes import PycordBot
|
||||
from libbot.sync import config_get as sync_config_get
|
||||
|
||||
from classes.holo_user import HoloUser
|
||||
from enums.colors import Color
|
||||
from enums import Color
|
||||
from modules.database import col_users
|
||||
from modules.utils import config_get
|
||||
from modules.utils_sync import config_get_sync, guild_name
|
||||
from modules.utils_sync import guild_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CustomChannels(commands.Cog):
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client = client
|
||||
self.client: PycordBot = client
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_guild_channel_delete(self, channel: GuildChannel):
|
||||
col_users.find_one_and_update(
|
||||
async def on_guild_channel_delete(self, channel: GuildChannel) -> None:
|
||||
await col_users.find_one_and_update(
|
||||
{"customchannel": channel.id}, {"$set": {"customchannel": None}}
|
||||
)
|
||||
|
||||
customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом")
|
||||
custom_channel_group: SlashCommandGroup = SlashCommandGroup(
|
||||
"customchannel", "Керування особистим каналом"
|
||||
)
|
||||
|
||||
@customchannel.command(
|
||||
@custom_channel_group.command(
|
||||
name="get",
|
||||
description="Отримати персональний текстовий канал",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option("name", description="Назва каналу")
|
||||
@option("reactions", description="Дозволити реакції")
|
||||
@option("threads", description="Дозволити гілки")
|
||||
async def customchannel_get_cmd(
|
||||
async def custom_channel_get_cmd(
|
||||
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
|
||||
):
|
||||
holo_user_ctx = HoloUser(ctx.user)
|
||||
) -> None:
|
||||
holo_user_ctx: HoloUser = HoloUser(ctx.user)
|
||||
|
||||
if holo_user_ctx.customchannel == None:
|
||||
await ctx.defer()
|
||||
created_channel = await ctx.user.guild.create_text_channel(
|
||||
name=name,
|
||||
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
|
||||
category=ds_utils.get(
|
||||
ctx.author.guild.categories,
|
||||
id=await config_get("customchannels", "categories"),
|
||||
),
|
||||
)
|
||||
await created_channel.set_permissions(
|
||||
ctx.user.guild.default_role,
|
||||
send_messages=False,
|
||||
add_reactions=reactions,
|
||||
create_public_threads=threads,
|
||||
create_private_threads=threads,
|
||||
)
|
||||
await created_channel.set_permissions(
|
||||
ctx.user,
|
||||
attach_files=True,
|
||||
manage_messages=True,
|
||||
send_messages=True,
|
||||
embed_links=True,
|
||||
manage_channels=True,
|
||||
)
|
||||
holo_user_ctx.set("customchannel", created_channel.id)
|
||||
# Return if the user is using the command outside of a guild
|
||||
if not hasattr(ctx.author, "guild"):
|
||||
await ctx.defer(ephemeral=True)
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Створено канал",
|
||||
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
|
||||
color=Color.success,
|
||||
title="Помилка виконання",
|
||||
description="Виконання за межами сервера не є можливим.",
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
bots = await config_get("bots")
|
||||
for bot in bots:
|
||||
await created_channel.set_permissions(
|
||||
ds_utils.get(ctx.user.guild.roles, id=bots[bot]["role"]),
|
||||
view_channel=False,
|
||||
)
|
||||
else:
|
||||
return
|
||||
|
||||
# Return if the user already has a custom channel
|
||||
if holo_user_ctx.customchannel is not None:
|
||||
await ctx.defer(ephemeral=True)
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Помилка виконання",
|
||||
description="У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
@customchannel.command(
|
||||
await ctx.defer()
|
||||
|
||||
created_channel: TextChannel = await ctx.user.guild.create_text_channel(
|
||||
name=name,
|
||||
reason=f"Користувач {guild_name(ctx.user)} отримав власний приватний канал",
|
||||
category=ds_utils.get(
|
||||
ctx.author.guild.categories,
|
||||
id=await config_get("customchannels", "categories"),
|
||||
),
|
||||
)
|
||||
|
||||
await created_channel.set_permissions(
|
||||
ctx.user.guild.default_role,
|
||||
send_messages=False,
|
||||
add_reactions=reactions,
|
||||
create_public_threads=threads,
|
||||
create_private_threads=threads,
|
||||
)
|
||||
await created_channel.set_permissions(
|
||||
ctx.user,
|
||||
attach_files=True,
|
||||
manage_messages=True,
|
||||
send_messages=True,
|
||||
embed_links=True,
|
||||
manage_channels=True,
|
||||
)
|
||||
|
||||
await holo_user_ctx.set("customchannel", created_channel.id)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Створено канал",
|
||||
description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`",
|
||||
color=Color.SUCCESS,
|
||||
)
|
||||
)
|
||||
|
||||
bots: Dict[str, Any] = await config_get("bots")
|
||||
|
||||
for bot in bots:
|
||||
role: Union[Role, None] = ds_utils.get(
|
||||
ctx.user.guild.roles, id=bots[bot]["role"]
|
||||
)
|
||||
|
||||
if role is not None:
|
||||
await created_channel.set_permissions(
|
||||
role,
|
||||
view_channel=False,
|
||||
)
|
||||
|
||||
@custom_channel_group.command(
|
||||
name="edit",
|
||||
description="Змінити параметри особистого каналу",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option("name", description="Назва каналу")
|
||||
@option("reactions", description="Дозволити реакції")
|
||||
@option("threads", description="Дозволити гілки")
|
||||
async def customchannel_edit_cmd(
|
||||
async def custom_channel_edit_cmd(
|
||||
self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool
|
||||
):
|
||||
holo_user_ctx = HoloUser(ctx.user)
|
||||
) -> None:
|
||||
holo_user_ctx: HoloUser = HoloUser(ctx.user)
|
||||
|
||||
custom_channel = ds_utils.get(
|
||||
custom_channel: Union[TextChannel, None] = ds_utils.get(
|
||||
ctx.guild.channels, id=holo_user_ctx.customchannel
|
||||
)
|
||||
|
||||
# Return if the channel was not found
|
||||
if custom_channel is None:
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Канал не знайдено",
|
||||
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
await custom_channel.edit(name=name)
|
||||
await custom_channel.set_permissions(
|
||||
ctx.user.guild.default_role,
|
||||
@ -119,68 +155,84 @@ class CustomChannels(commands.Cog):
|
||||
create_public_threads=threads,
|
||||
create_private_threads=threads,
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Канал змінено",
|
||||
description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
|
||||
@customchannel.command(
|
||||
@custom_channel_group.command(
|
||||
name="remove",
|
||||
description="Відібрати канал, знищуючи його, та частково повернути кошти",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option("confirm", description="Підтвердження операції")
|
||||
async def customchannel_remove_cmd(
|
||||
async def custom_channel_remove_cmd(
|
||||
self, ctx: ApplicationContext, confirm: bool = False
|
||||
):
|
||||
holo_user_ctx = HoloUser(ctx.user)
|
||||
) -> None:
|
||||
holo_user_ctx: HoloUser = HoloUser(ctx.user)
|
||||
|
||||
if holo_user_ctx.customchannel is not None:
|
||||
await ctx.defer()
|
||||
custom_channel = ds_utils.get(
|
||||
ctx.guild.channels, id=holo_user_ctx.customchannel
|
||||
)
|
||||
if custom_channel is None:
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Канал не знайдено",
|
||||
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
|
||||
color=Color.fail,
|
||||
)
|
||||
)
|
||||
holo_user_ctx.set("customchannel", None)
|
||||
return
|
||||
if not confirm:
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Підтвердження не надано",
|
||||
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
|
||||
color=Color.fail,
|
||||
)
|
||||
)
|
||||
return
|
||||
await custom_channel.delete(reason="Власник запросив видалення")
|
||||
holo_user_ctx.set("customchannel", None)
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Канал знищено",
|
||||
description="Ви відмовились від каналу та видалили його.",
|
||||
color=Color.default,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Return if the user does not have a custom channel
|
||||
if holo_user_ctx.customchannel is None:
|
||||
await ctx.defer(ephemeral=True)
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Помилка виконання",
|
||||
description="У вас немає особистого каналу.",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
await ctx.defer()
|
||||
|
||||
custom_channel: Union[TextChannel, None] = ds_utils.get(
|
||||
ctx.guild.channels, id=holo_user_ctx.customchannel
|
||||
)
|
||||
|
||||
# Return if the channel was not found
|
||||
if custom_channel is None:
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Канал не знайдено",
|
||||
description="Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.",
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
await holo_user_ctx.set("customchannel", None)
|
||||
return
|
||||
|
||||
# Return if the confirmation is missing
|
||||
if not confirm:
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Підтвердження не надано",
|
||||
description="Для підтвердження операції додайте до команди параметр `confirm` зі значенням `True`.",
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
await custom_channel.delete(reason="Власник запросив видалення")
|
||||
|
||||
await holo_user_ctx.set("customchannel", None)
|
||||
|
||||
try:
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Канал знищено",
|
||||
description="Ви відмовились від каналу та видалили його.",
|
||||
color=Color.DEFAULT,
|
||||
)
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Could not send a custom channel removal confirmation due to: %s", exc
|
||||
)
|
||||
|
||||
|
||||
def setup(client: PycordBot):
|
||||
def setup(client: PycordBot) -> None:
|
||||
client.add_cog(CustomChannels(client))
|
||||
|
192
cogs/data.py
192
cogs/data.py
@ -1,167 +1,183 @@
|
||||
import logging
|
||||
from os import makedirs
|
||||
from pathlib import Path
|
||||
from typing import Union, List, Dict, Any
|
||||
from uuid import uuid4
|
||||
|
||||
from discord import ApplicationContext, Embed, File, option
|
||||
from discord import ApplicationContext, Embed, File, option, Role, TextChannel
|
||||
from discord import utils as ds_utils
|
||||
from discord.commands import SlashCommandGroup
|
||||
from discord.ext import commands
|
||||
from libbot import config_get
|
||||
from libbot.pycord.classes import PycordBot
|
||||
from libbot.sync import config_get as sync_config_get
|
||||
from libbot.sync import json_write as sync_json_write
|
||||
|
||||
from classes.holo_user import HoloUser
|
||||
from enums.colors import Color
|
||||
from enums import Color
|
||||
from modules.database import col_users
|
||||
from modules.utils import config_get
|
||||
from modules.utils_sync import config_get_sync, guild_name, json_write_sync
|
||||
from modules.utils_sync import guild_name
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Data(commands.Cog):
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client = client
|
||||
self.client: PycordBot = client
|
||||
|
||||
data = SlashCommandGroup("data", "Керування даними користувачів")
|
||||
data: SlashCommandGroup = SlashCommandGroup("data", "Керування даними користувачів")
|
||||
|
||||
@data.command(
|
||||
name="export",
|
||||
description="Експортувати дані",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option(
|
||||
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
|
||||
)
|
||||
async def data_export_cmd(self, ctx: ApplicationContext, kind: str):
|
||||
async def data_export_cmd(self, ctx: ApplicationContext, kind: str) -> None:
|
||||
await ctx.defer()
|
||||
holo_user = HoloUser(ctx.author)
|
||||
if (ctx.user.id in self.client.owner_ids) or (
|
||||
await holo_user.is_council(ctx.author)
|
||||
|
||||
# Return if the user is not an owner and not in the council
|
||||
if (ctx.user.id not in self.client.owner_ids) and not (
|
||||
await HoloUser.is_council(ctx.author)
|
||||
):
|
||||
logging.info(
|
||||
"Moderator %s exported current users list", guild_name(ctx.user)
|
||||
)
|
||||
makedirs("tmp", exist_ok=True)
|
||||
uuid = str(uuid4())
|
||||
|
||||
if kind == "Користувачі":
|
||||
users = []
|
||||
|
||||
for member in ctx.guild.members:
|
||||
users.append(
|
||||
{
|
||||
"id": member.id,
|
||||
"nick": member.nick,
|
||||
"username": f"{member.name}#{member.discriminator}",
|
||||
"bot": member.bot,
|
||||
}
|
||||
)
|
||||
|
||||
json_write_sync(users, str(Path(f"tmp/{uuid}")))
|
||||
|
||||
await ctx.respond(
|
||||
file=File(str(Path(f"tmp/{uuid}")), filename="users.json")
|
||||
)
|
||||
else:
|
||||
logging.info(
|
||||
"User %s tried to use /export but permission denied",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Відмовлено в доступі",
|
||||
description="Здається, це команда лише для модераторів",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
mod_role = ds_utils.get(
|
||||
|
||||
mod_role: Union[Role, None] = ds_utils.get(
|
||||
ctx.user.guild.roles, id=await config_get("moderators", "roles")
|
||||
)
|
||||
admin_chan = ds_utils.get(
|
||||
admin_chan: Union[TextChannel, None] = ds_utils.get(
|
||||
ctx.user.guild.channels,
|
||||
id=await config_get("adminchat", "channels", "text"),
|
||||
)
|
||||
|
||||
await admin_chan.send(
|
||||
content=f"{mod_role.mention}",
|
||||
content="" if mod_role is None else mod_role.mention,
|
||||
embed=Embed(
|
||||
title="Неавторизований запит",
|
||||
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
),
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
logging.info("Moderator %s exported current users list", guild_name(ctx.user))
|
||||
|
||||
makedirs("tmp", exist_ok=True)
|
||||
|
||||
uuid: str = str(uuid4())
|
||||
|
||||
if kind == "Користувачі":
|
||||
users: List[Dict[str, Any]] = []
|
||||
|
||||
for member in ctx.guild.members:
|
||||
users.append(
|
||||
{
|
||||
"id": member.id,
|
||||
"nick": member.nick,
|
||||
"username": f"{member.name}#{member.discriminator}",
|
||||
"bot": member.bot,
|
||||
}
|
||||
)
|
||||
|
||||
sync_json_write(users, Path(f"tmp/{uuid}"))
|
||||
|
||||
await ctx.respond(file=File(Path(f"tmp/{uuid}"), filename="users.json"))
|
||||
|
||||
@data.command(
|
||||
name="migrate",
|
||||
description="Мігрувати всіх користувачів до бази",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option(
|
||||
"kind", description="Тип даних, які треба експортувати", choices=["Користувачі"]
|
||||
)
|
||||
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str):
|
||||
async def data_migrate_cmd(self, ctx: ApplicationContext, kind: str) -> None:
|
||||
await ctx.defer()
|
||||
holo_user = HoloUser(ctx.author)
|
||||
if (ctx.user.id in self.client.owner_ids) or (
|
||||
await holo_user.is_council(ctx.author)
|
||||
|
||||
# Return if the user is not an owner and not in the council
|
||||
if (ctx.user.id not in self.client.owner_ids) and not (
|
||||
await HoloUser.is_council(ctx.author)
|
||||
):
|
||||
logging.info(
|
||||
"Moderator %s started migration of all members to the database",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
|
||||
if kind == "Користувачі":
|
||||
for member in ctx.guild.members:
|
||||
if member.bot:
|
||||
continue
|
||||
if col_users.find_one({"user": member.id}) is None:
|
||||
user = {}
|
||||
defaults = await config_get("user", "defaults")
|
||||
|
||||
user["user"] = member.id
|
||||
|
||||
for key in defaults:
|
||||
user[key] = defaults[key]
|
||||
|
||||
col_users.insert_one(document=user)
|
||||
logging.info(
|
||||
"Added DB record for user %s during migration", member.id
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Міграцію завершено",
|
||||
description="Всім користувачам сервера було створено записи в базі даних.",
|
||||
color=Color.success,
|
||||
)
|
||||
)
|
||||
else:
|
||||
logging.info(
|
||||
"User %s tried to use /migrate but permission denied",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Відмовлено в доступі",
|
||||
description="Здається, це команда лише для модераторів",
|
||||
color=Color.fail,
|
||||
color=Color.FAIL,
|
||||
)
|
||||
)
|
||||
mod_role = ds_utils.get(
|
||||
|
||||
mod_role: Union[Role, None] = ds_utils.get(
|
||||
ctx.user.guild.roles, id=await config_get("moderators", "roles")
|
||||
)
|
||||
admin_chan = ds_utils.get(
|
||||
admin_chan: Union[TextChannel, None] = ds_utils.get(
|
||||
ctx.user.guild.channels,
|
||||
id=await config_get("adminchat", "channels", "text"),
|
||||
)
|
||||
await admin_chan.send(
|
||||
content=f"{mod_role.mention}",
|
||||
embed=Embed(
|
||||
title="Неавторизований запит",
|
||||
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
|
||||
color=Color.fail,
|
||||
),
|
||||
|
||||
if admin_chan is not None:
|
||||
await admin_chan.send(
|
||||
content="" if mod_role is None else mod_role.mention,
|
||||
embed=Embed(
|
||||
title="Неавторизований запит",
|
||||
description=f"Користувач {ctx.user.mention} запитав у каналі {ctx.channel.mention} команду, до якої не повинен мати доступу/бачити.",
|
||||
color=Color.FAIL,
|
||||
),
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
logging.info(
|
||||
"Moderator %s started migration of all members to the database",
|
||||
guild_name(ctx.user),
|
||||
)
|
||||
|
||||
if kind == "Користувачі":
|
||||
for member in ctx.guild.members:
|
||||
if member.bot:
|
||||
continue
|
||||
|
||||
if (await col_users.find_one({"user": member.id})) is None:
|
||||
user: Dict[str, Any] = {}
|
||||
defaults: Dict[str, Any] = await config_get("user", "defaults")
|
||||
|
||||
user["user"] = member.id
|
||||
|
||||
for key in defaults:
|
||||
user[key] = defaults[key]
|
||||
|
||||
await col_users.insert_one(document=user)
|
||||
|
||||
logging.info(
|
||||
"Added DB record for user %s during migration", member.id
|
||||
)
|
||||
|
||||
await ctx.respond(
|
||||
embed=Embed(
|
||||
title="Міграцію завершено",
|
||||
description="Всім користувачам сервера було створено записи в базі даних.",
|
||||
color=Color.SUCCESS,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def setup(client: PycordBot):
|
||||
def setup(client: PycordBot) -> None:
|
||||
client.add_cog(Data(client))
|
||||
|
31
cogs/fun.py
31
cogs/fun.py
@ -2,39 +2,38 @@ import logging
|
||||
|
||||
from discord import ApplicationContext, Embed, User, option, slash_command
|
||||
from discord.ext import commands
|
||||
from libbot import config_get
|
||||
from libbot.pycord.classes import PycordBot
|
||||
from WaifuPicsPython import WaifuAsync
|
||||
from libbot.sync import config_get as sync_config_get
|
||||
|
||||
from modules.utils import config_get
|
||||
from modules.utils_sync import config_get_sync, guild_name
|
||||
from modules.utils_sync import guild_name
|
||||
from modules.waifu_pics import waifu_pics
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
wafiu_pics = WaifuAsync()
|
||||
|
||||
|
||||
class Fun(commands.Cog):
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client = client
|
||||
self.client: PycordBot = client
|
||||
|
||||
@slash_command(
|
||||
name="action",
|
||||
description="Провести над користувачем РП дію",
|
||||
guild_ids=[config_get_sync("guild")],
|
||||
guild_ids=[sync_config_get("guild")],
|
||||
)
|
||||
@option(
|
||||
"type",
|
||||
description="Тип дії, яку хочете провести з користувачем",
|
||||
choices=config_get_sync("actions").keys(),
|
||||
choices=sync_config_get("actions").keys(),
|
||||
)
|
||||
@option("user", description="Користувач")
|
||||
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User):
|
||||
async def action_cmd(self, ctx: ApplicationContext, type: str, user: User) -> None:
|
||||
await ctx.defer()
|
||||
|
||||
action = await config_get("category", "actions", type)
|
||||
action_verb = await config_get("action", "actions", type)
|
||||
action: str = await config_get("category", "actions", type)
|
||||
action_verb: str = await config_get("action", "actions", type)
|
||||
|
||||
image = await wafiu_pics.sfw(action)
|
||||
image_url: str = await waifu_pics.sfw(action)
|
||||
|
||||
logger.info(
|
||||
"User %s (%s) %s %s (%s) with image %s",
|
||||
@ -43,17 +42,17 @@ class Fun(commands.Cog):
|
||||
action_verb,
|
||||
guild_name(user),
|
||||
user.id,
|
||||
image,
|
||||
image_url,
|
||||
)
|
||||
|
||||
embed = Embed(
|
||||
embed: Embed = Embed(
|
||||
description=f"**{guild_name(ctx.user)}** {action_verb} **{guild_name(user)}**",
|
||||
color=0x2F3136,
|
||||
)
|
||||
embed.set_image(url=image)
|
||||
embed.set_image(url=image_url)
|
||||
|
||||
await ctx.respond(embed=embed)
|
||||
|
||||
|
||||
def setup(client: PycordBot):
|
||||
def setup(client: PycordBot) -> None:
|
||||
client.add_cog(Fun(client))
|
||||
|
@ -1,49 +1,51 @@
|
||||
from discord import Member, Message
|
||||
from typing import Dict, Any, Union
|
||||
|
||||
from discord import Member, Message, TextChannel
|
||||
from discord import utils as ds_utils
|
||||
from discord.ext import commands
|
||||
from libbot import config_get
|
||||
from libbot.pycord.classes import PycordBot
|
||||
|
||||
from modules.database import col_users
|
||||
from modules.utils import config_get
|
||||
|
||||
|
||||
class Logger(commands.Cog):
|
||||
def __init__(self, client: PycordBot):
|
||||
self.client = client
|
||||
self.client: PycordBot = client
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_message(self, message: Message):
|
||||
if (
|
||||
(message.author != self.client.user)
|
||||
and (message.author.bot == False)
|
||||
and (message.author.system == False)
|
||||
and (message.author.bot is False)
|
||||
and (message.author.system is False)
|
||||
):
|
||||
if col_users.find_one({"user": message.author.id}) is None:
|
||||
user = {}
|
||||
defaults = await config_get("user", "defaults")
|
||||
if (await col_users.find_one({"user": message.author.id})) is None:
|
||||
user: Dict[str, Any] = {}
|
||||
defaults: Dict[str, Any] = await config_get("user", "defaults")
|
||||
|
||||
user["user"] = message.author.id
|
||||
|
||||
for key in defaults:
|
||||
user[key] = defaults[key]
|
||||
|
||||
col_users.insert_one(document=user)
|
||||
await col_users.insert_one(document=user)
|
||||
|
||||
@commands.Cog.listener()
|
||||
async def on_member_join(self, member: Member):
|
||||
welcome_chan = ds_utils.get(
|
||||
async def on_member_join(self, member: Member) -> None:
|
||||
welcome_chan: Union[TextChannel, None] = ds_utils.get(
|
||||
self.client.get_guild(await config_get("guild")).channels,
|
||||
id=await config_get("welcome", "channels", "text"),
|
||||
)
|
||||
rules_chan = ds_utils.get(
|
||||
rules_chan: Union[TextChannel, None] = ds_utils.get(
|
||||
self.client.get_guild(await config_get("guild")).channels,
|
||||
id=await config_get("rules", "channels", "text"),
|
||||
)
|
||||
|
||||
if (
|
||||
(member != self.client.user)
|
||||
and (member.bot == False)
|
||||
and (member.system == False)
|
||||
and (member.bot is False)
|
||||
and (member.system is False)
|
||||
):
|
||||
await welcome_chan.send(
|
||||
content=(await config_get("welcome", "messages")).format(
|
||||
@ -51,17 +53,17 @@ class Logger(commands.Cog):
|
||||
)
|
||||
)
|
||||
|
||||
if col_users.find_one({"user": member.id}) is None:
|
||||
user = {}
|
||||
defaults = await config_get("user", "defaults")
|
||||
if (await col_users.find_one({"user": member.id})) is None:
|
||||
user: Dict[str, Any] = {}
|
||||
defaults: Dict[str, Any] = await config_get("user", "defaults")
|
||||
|
||||
user["user"] = member.id
|
||||
|
||||
for key in defaults:
|
||||
user[key] = defaults[key]
|
||||
|
||||
col_users.insert_one(document=user)
|
||||
await col_users.insert_one(document=user)
|
||||
|
||||
|
||||
def setup(client: PycordBot):
|
||||
def setup(client: PycordBot) -> None:
|
||||
client.add_cog(Logger(client))
|
||||
|
1
enums/__init__.py
Normal file
1
enums/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .colors import Color
|
@ -1,6 +1,7 @@
|
||||
from enum import IntEnum
|
||||
|
||||
|
||||
class Color(IntEnum):
|
||||
fail = 0xd6345b
|
||||
success = 0x84d961
|
||||
default = 0xa7a6ab
|
||||
FAIL = 0xD6345B
|
||||
SUCCESS = 0x84D961
|
||||
DEFAULT = 0xA7A6AB
|
||||
|
1
errors/__init__.py
Normal file
1
errors/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .user import UserNotFoundError
|
10
errors/user.py
Normal file
10
errors/user.py
Normal file
@ -0,0 +1,10 @@
|
||||
class UserNotFoundError(Exception):
|
||||
"""HoloUser could not find user with such an ID in database"""
|
||||
|
||||
def __init__(self, user, user_id):
|
||||
self.user = user
|
||||
self.user_id = user_id
|
||||
|
||||
super().__init__(
|
||||
f"User of type {type(self.user)} with id {self.user_id} was not found"
|
||||
)
|
22
main.py
22
main.py
@ -1,11 +1,13 @@
|
||||
import logging
|
||||
import sys
|
||||
from logging import Logger
|
||||
|
||||
from discord import Activity, ActivityType
|
||||
from libbot import config_get
|
||||
from libbot.sync import config_get as sync_config_get
|
||||
|
||||
from modules.client import client
|
||||
from modules.scheduled import scheduler
|
||||
from modules.utils import config_get
|
||||
from modules.utils_sync import config_get_sync
|
||||
from modules.scheduler import scheduler
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
@ -13,7 +15,7 @@ logging.basicConfig(
|
||||
datefmt="[%X]",
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger: Logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import uvloop # type: ignore
|
||||
@ -24,11 +26,11 @@ except ImportError:
|
||||
|
||||
|
||||
@client.event
|
||||
async def on_ready():
|
||||
async def on_ready() -> None:
|
||||
logger.info("Logged in as %s", client.user)
|
||||
|
||||
activity_type = await config_get("type", "status")
|
||||
activity_message = await config_get("message", "status")
|
||||
activity_type: str = await config_get("type", "status")
|
||||
activity_message: str = await config_get("message", "status")
|
||||
|
||||
if activity_type == "playing":
|
||||
await client.change_presence(
|
||||
@ -62,15 +64,15 @@ async def on_ready():
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
def main() -> None:
|
||||
client.load_extension("cogs")
|
||||
|
||||
try:
|
||||
scheduler.start()
|
||||
client.run(config_get_sync("bot_token", "bot"))
|
||||
client.run(sync_config_get("bot_token", "bot"))
|
||||
except KeyboardInterrupt:
|
||||
scheduler.shutdown()
|
||||
exit()
|
||||
sys.exit()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -1,6 +1,10 @@
|
||||
from discord import Intents
|
||||
from libbot.pycord.classes import PycordBot
|
||||
|
||||
intents = Intents().all()
|
||||
from modules.scheduler import scheduler
|
||||
|
||||
intents: Intents = Intents().all()
|
||||
|
||||
intents.members = True
|
||||
client = PycordBot(intents=intents)
|
||||
|
||||
client: PycordBot = PycordBot(intents=intents, scheduler=scheduler)
|
||||
|
@ -1,12 +1,19 @@
|
||||
from typing import Dict, Any
|
||||
|
||||
from async_pymongo import AsyncClient, AsyncCollection, AsyncDatabase
|
||||
from libbot.sync import config_get as sync_config_get
|
||||
from pymongo import MongoClient
|
||||
from ujson import loads
|
||||
from pymongo.synchronous.collection import Collection
|
||||
from pymongo.synchronous.database import Database
|
||||
|
||||
with open("config.json", "r", encoding="utf-8") as f:
|
||||
db_config = loads(f.read())["database"]
|
||||
f.close()
|
||||
db_config: Dict[str, Any] = sync_config_get("database")
|
||||
|
||||
db_client = MongoClient(
|
||||
"mongodb://{0}:{1}@{2}:{3}/{4}".format(
|
||||
con_string: str = (
|
||||
"mongodb://{0}:{1}/{2}".format(
|
||||
db_config["host"], db_config["port"], db_config["name"]
|
||||
)
|
||||
if db_config["user"] is None or db_config["password"] is None
|
||||
else "mongodb://{0}:{1}@{2}:{3}/{4}".format(
|
||||
db_config["user"],
|
||||
db_config["password"],
|
||||
db_config["host"],
|
||||
@ -14,14 +21,20 @@ db_client = MongoClient(
|
||||
db_config["name"],
|
||||
)
|
||||
)
|
||||
db = db_client.get_database(name=db_config["name"])
|
||||
|
||||
collections = db.list_collection_names()
|
||||
db_client: AsyncClient = AsyncClient(con_string)
|
||||
db_client_sync: MongoClient = MongoClient(con_string)
|
||||
|
||||
for collection in ["users", "warnings", "scheduler", "analytics"]:
|
||||
if not collection in collections:
|
||||
db.create_collection(collection)
|
||||
# Async declarations per default
|
||||
db: AsyncDatabase = db_client.get_database(name=db_config["name"])
|
||||
|
||||
col_users = db.get_collection("users")
|
||||
col_warnings = db.get_collection("warnings")
|
||||
col_analytics = db.get_collection("analytics")
|
||||
col_users: AsyncCollection = db.get_collection("users")
|
||||
col_warnings: AsyncCollection = db.get_collection("warnings")
|
||||
col_analytics: AsyncCollection = db.get_collection("analytics")
|
||||
|
||||
# Sync declarations as a fallback
|
||||
sync_db: Database = db_client_sync.get_database(name=db_config["name"])
|
||||
|
||||
sync_col_users: Collection = sync_db.get_collection("users")
|
||||
sync_col_warnings: Collection = sync_db.get_collection("warnings")
|
||||
sync_col_analytics: Collection = sync_db.get_collection("analytics")
|
||||
|
@ -1,3 +1,3 @@
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
|
||||
scheduler = AsyncIOScheduler()
|
||||
scheduler: AsyncIOScheduler = AsyncIOScheduler()
|
@ -1,38 +0,0 @@
|
||||
from typing import Any
|
||||
|
||||
import aiofiles
|
||||
from ujson import dumps, loads
|
||||
|
||||
|
||||
async def json_read(path: str) -> Any:
|
||||
async with aiofiles.open(path, mode="r", encoding="utf-8") as f:
|
||||
data = await f.read()
|
||||
return loads(data)
|
||||
|
||||
|
||||
async def json_write(data: Any, path: str) -> None:
|
||||
async with aiofiles.open(path, mode="w", encoding="utf-8") as f:
|
||||
await f.write(
|
||||
dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4)
|
||||
)
|
||||
|
||||
|
||||
async def config_get(key: str, *path: str) -> Any:
|
||||
this_key = await json_read("config.json")
|
||||
for dict_key in path:
|
||||
this_key = this_key[dict_key]
|
||||
return this_key[key]
|
||||
|
||||
|
||||
async def config_set(key: str, value: Any, *path: str) -> None:
|
||||
this_dict = await json_read("config.json")
|
||||
string = "this_dict"
|
||||
for arg in path:
|
||||
string += f'["{arg}"]'
|
||||
if type(value) in [str]:
|
||||
string += f'["{key}"] = "{value}"'
|
||||
else:
|
||||
string += f'["{key}"] = {value}'
|
||||
exec(string)
|
||||
await json_write(this_dict, "config.json")
|
||||
return
|
@ -1,45 +1,10 @@
|
||||
from typing import Any, Union
|
||||
from typing import Union
|
||||
|
||||
from discord import Member, User
|
||||
from ujson import dumps, loads
|
||||
|
||||
|
||||
def json_read_sync(path: str) -> Any:
|
||||
with open(path, mode="r", encoding="utf-8") as f:
|
||||
data = f.read()
|
||||
return loads(data)
|
||||
|
||||
|
||||
def json_write_sync(data: Any, path: str) -> None:
|
||||
with open(path, mode="w", encoding="utf-8") as f:
|
||||
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False, indent=4))
|
||||
|
||||
|
||||
def config_get_sync(key: str, *path: str) -> Any:
|
||||
this_key = json_read_sync("config.json")
|
||||
for dict_key in path:
|
||||
this_key = this_key[dict_key]
|
||||
return this_key[key]
|
||||
|
||||
|
||||
def config_set_sync(key: str, value: Any, *path: str) -> None:
|
||||
this_dict = json_read_sync("config.json")
|
||||
string = "this_dict"
|
||||
for arg in path:
|
||||
string += f'["{arg}"]'
|
||||
if type(value) in [str]:
|
||||
string += f'["{key}"] = "{value}"'
|
||||
else:
|
||||
string += f'["{key}"] = {value}'
|
||||
exec(string)
|
||||
json_write_sync(this_dict, "config.json")
|
||||
return
|
||||
|
||||
|
||||
def guild_name(member: Union[Member, User]) -> str:
|
||||
if isinstance(member, User):
|
||||
return member.name
|
||||
if member.nick == None:
|
||||
return member.name
|
||||
else:
|
||||
return member.nick
|
||||
|
||||
return member.name if member.nick is None else member.nick
|
||||
|
3
modules/waifu_pics.py
Normal file
3
modules/waifu_pics.py
Normal file
@ -0,0 +1,3 @@
|
||||
from WaifuPicsPython import WaifuAsync
|
||||
|
||||
waifu_pics: WaifuAsync = WaifuAsync()
|
@ -1,8 +1,10 @@
|
||||
aiofiles==23.2.1
|
||||
apscheduler==3.10.4
|
||||
pymongo~=4.7.3
|
||||
requests~=2.32.3
|
||||
# Waifu pics related dependencies (not listed directly by waifupics)
|
||||
aiohttp>=3.10.0
|
||||
requests>=2.32.2
|
||||
|
||||
aiofiles~=24.1.0
|
||||
apscheduler>=3.10.0
|
||||
async_pymongo==0.1.11
|
||||
libbot[speed,pycord]==3.2.3
|
||||
ujson~=5.10.0
|
||||
WaifuPicsPython==0.2.0
|
||||
--extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
|
||||
libbot[speed,pycord]==3.2.2
|
||||
WaifuPicsPython==0.2.0
|
Loading…
Reference in New Issue
Block a user