WIP: Custom channels

This commit is contained in:
Profitroll 2023-05-04 16:09:47 +02:00
parent 86de0dc6aa
commit b8cc18041e
7 changed files with 381 additions and 2 deletions

220
classes/holo_user.py Normal file
View File

@ -0,0 +1,220 @@
from datetime import datetime, timezone
from random import randint
from typing import Union, Any
import discord
import discord.member
from modules.utils import config_get
from modules.database import col_users, col_warnings
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
class NotEnoughMoneyError(Exception):
"""User does not have enough money to do that"""
pass
class UserNotFoundError(Exception):
"""HoloUser could not find user with such an ID in database"""
def __init__(self, user, user_id):
self.user = user
self.user_id = user_id
super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found")
class HoloUser():
def __init__(self, user: Union[discord.User, discord.Member, discord.member.Member, int]) -> None:
"""Get an object that has a proper binding between Discord ID and database
### Args:
* `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted
### Raises:
* `UserNotFoundError`: User with such ID does not seem to exist in database
"""
if hasattr(user, "id"):
self.id = user.id # type: ignore
else:
self.id = user
jav_user = col_users.find_one({"user": self.id})
if jav_user is None:
raise UserNotFoundError(user=user, user_id=self.id)
self.db_id = jav_user["_id"]
# self.xp = jav_user["xp"]
# self.xp_next = jav_user["xp_next"]
# self.level = jav_user["level"]
# self.work_xp = jav_user["work_xp"]
# self.balance = jav_user["balance"]
self.customrole = jav_user["customrole"]
self.customchannel = jav_user["customchannel"]
# self.married = jav_user["married"]
# self.marriage_request = jav_user["marriage_request"]
# self.marriage_request_sent = jav_user["marriage_request_sent"]
# self.cooldown = jav_user["cooldown"]
self.warnings = self.warns()
# def xp_add(self, amount: int = 1) -> None:
# """Add some amount of XP points
# ### Args:
# * `amount` (int, optional): Amount of XP points to give. Defaults to 1.
# """
# self.xp += amount
# col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "xp": self.xp } })
# def xp_level_up(self) -> None:
# """Add 1 to the current XP level"""
# xp_diff = int(self.xp - self.xp_next)
# xp_next = int(self.xp_next*configGet("multiplier", "leveling")+configGet("addition", "leveling"))
# self.xp = xp_diff
# self.xp_next = xp_next
# self.level += 1
# col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "xp": xp_diff } })
# col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "xp_next": xp_next } })
# col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "level": self.level } })
# def balance_set(self, amount: int) -> None:
# """Set the balance to amount
# ### Args:
# * `amount` (int): Amount of currency to be set
# """
# self.balance = amount
# col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "balance": self.balance } })
# def balance_add(self, amount: int) -> None:
# """Add amount to the balance
# ### Args:
# * `amount` (int): Amount to be added
# """
# self.balance_set(self.balance+amount)
# def balance_take(self, amount: int) -> bool:
# """Take amount from the balance
# ### Args:
# * `amount` (int): Amount to be taken
# ### Returns:
# * `bool`: True if successful and False if not
# """
# if self.balance >= amount:
# self.balance_set(self.balance-amount)
# return True
# else:
# return False
# #raise NotEnoughMoneyError()
# def balance_transfer(self, amount: int, destination: Union[Any, int]) -> None:
# """Transfer money to another user
# ### Args:
# * `amount` (int): Amount to be transferred
# * `destination` (Union[Any, int]): Destination user of the transfer (should have attribute "id" or be id itself if int)
# ### Raises:
# * `NotEnoughMoneyError`: Not enough money to perform this transaction
# """
# if self.balance >= amount:
# if isinstance(destination, int):
# destination = HoloUser(destination)
# self.balance_take(amount)
# destination.balance_add(amount) # type: ignore
# else:
# raise NotEnoughMoneyError()
# def salary_get(self) -> int:
# """Get the salary level depending on work_xp
# ### Returns:
# * `int`: Amount of money to be earned
# """
# if self.work_xp >= 100:
# return randint(configGet("min", "work", "level", "4"), configGet("max", "work", "level", "4"))
# elif self.work_xp >= 50:
# return randint(configGet("min", "work", "level", "3"), configGet("max", "work", "level", "3"))
# elif self.work_xp > 10:
# return randint(configGet("min", "work", "level", "2"), configGet("max", "work", "level", "2"))
# else:
# return randint(configGet("min", "work", "level", "1"), configGet("max", "work", "level", "1"))
# def work_xp_add(self) -> None:
# self.set("work_xp", self.work_xp+1)
def warns(self) -> int:
"""Get number of warnings user has
### Returns:
* `int`: Number of warnings
"""
warns = col_warnings.find_one({"user": self.id})
if warns == None:
return 0
else:
return warns["warns"]
def warn(self, count=1, reason: str = "Not provided") -> None:
"""Warn and add count to warns number
### Args:
* `count` (int, optional): Count of warnings to be added. Defaults to 1.
"""
warns = col_warnings.find_one({"user": self.id})
if warns != None:
col_warnings.update_one(filter={"_id": self.db_id}, update={ "$set": { "warns": warns["warns"]+count } })
else:
col_warnings.insert_one(document={ "user": self.id, "warns": count })
logWrite(f"User {self.id} was warned {count} times due to: {reason}")
# def cooldown_go(self, kind: Literal["work", "daily", "weekly", "monthly", "steal"]) -> None:
# """Set cooldown start of kind now
# ### Args:
# * `kind` (Literal["work", "daily", "weekly", "monthly", "steal"]): Kind of a cooldown
# """
# self.cooldown[kind] = datetime.now(tz=timezone.utc)
# col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "cooldown": self.cooldown } })
def set(self, key: str, value: Any) -> None:
"""Set attribute data and save it into database
### Args:
* `key` (str): Attribute to be changed
* `value` (Any): Value to set
"""
if not hasattr(self, key):
raise AttributeError()
setattr(self, key, value)
col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True)
logWrite(f"Set attribute {key} of user {self.id} to {value}")
def purge(self) -> None:
"""Completely remove data from database. Will not remove transactions logs and warnings."""
col_users.delete_one(filter={"_id": self.db_id})
self.unauthorize()
def unauthorize(self) -> None:
"""Cancel Oauth2 authorization"""
col_authorized.find_one_and_delete({"user": self.id})
# def is_authorized(self) -> bool:
# """Check if user provided Oauth2 authorization
# ### Returns:
# * `bool`: True if yes and False if no
# """
# if configGet("mode") == "secure":
# authorized = col_authorized.find_one({"user": self.id})
# if authorized is not None:
# return True
# else:
# return False
# else:
# return True

75
cogs/custom_channels.py Normal file
View File

@ -0,0 +1,75 @@
from discord import ApplicationContext, option, utils, Embed
from discord.ext import commands
from discord.commands import SlashCommandGroup
from classes.holo_user import HoloUser
from enums.colors import Color
from modules.utils import config_get
from modules.utils_sync import config_get_sync, guild_name
class CustomChannels(commands.Cog):
def __init__(self, client):
self.client = client
customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом") # discord.create_group("customrole", description="Керування особистою роллю")
@customchannel.command(name="buy", description="Отримати персональний текстовий канал", guild_ids=config_get_sync("guilds"))
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки")
async def customchannel_buy_cmd(self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool):
holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel == None:
await ctx.defer()
created_channel = await ctx.user.guild.create_text_channel(name=name,
reason=f"Користувач {guild_name(ctx.user)} купив канал",
category=utils.get(ctx.author.guild.categories, id=await config_get("customchannel", "categories")),
)
await created_channel.set_permissions(ctx.user.guild.default_role, send_messages=False, add_reactions=reactions, create_public_threads=threads, create_private_threads=threads)
await created_channel.set_permissions(ctx.user, attach_files=True, manage_messages=True, send_messages=True, embed_links=True, manage_channels=True)
# col_users.update_one(filter={"_id": holo_user_ctx.db_id}, update={ "$set": { "customchannel": created_channel.id } })
holo_user_ctx.set("customchannel", created_channel.id)
await ctx.respond(embed=Embed(title="Створено канал", description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`", color=Color.success))
bots = await config_get("bots")
for bot in bots:
await created_channel.set_permissions(utils.get(ctx.user.guild.roles, id=bots[bot]["role"]), view_channel=False)
else:
await ctx.defer(ephemeral=True)
await ctx.respond(embed=Embed(title="Помилка виконання", description=f"У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.", color=Color.fail))
@customchannel.command(name="edit", description="Змінити параметри особистого каналу", guild_ids=config_get_sync("guilds"))
@option("name", description="Назва каналу")
@option("reactions", description="Дозволити реакції")
@option("threads", description="Дозволити гілки")
async def customchannel_edit_cmd(self, ctx: ApplicationContext, name:str, reactions: bool, threads: bool):
holo_user_ctx = HoloUser(ctx.user)
custom_channel = utils.get(ctx.guild.channels, id=holo_user_ctx.customchannel)
if custom_channel is None:
await ctx.respond(embed=Embed(title="Канал не знайдено", description=f"Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", color=Color.fail))
return
await custom_channel.edit(name=name)
await custom_channel.set_permissions(ctx.user.guild.default_role, send_messages=False, add_reactions=reactions, create_public_threads=threads, create_private_threads=threads)
await ctx.respond(embed=Embed(title="Канал змінено", description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`", color=Color.fail))
@customchannel.command(name="refund", description="Відібрати канал, знищуючи його, та частково повернути кошти", guild_ids=config_get_sync("guilds"))
async def customchannel_refund_cmd(self, ctx: ApplicationContext):
holo_user_ctx = HoloUser(ctx.user)
if holo_user_ctx.customchannel is not None:
await ctx.defer()
custom_channel = utils.get(ctx.guild.channels, id=holo_user_ctx.customchannel)
if custom_channel is None:
await ctx.respond(embed=Embed(title="Канал не знайдено", description=f"Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", color=Color.fail))
holo_user_ctx.set("customchannel", None)
return
await custom_channel.delete(reason="Повернення коштів")
holo_user_ctx.set("customchannel", None)
await ctx.respond(embed=Embed(title="Канал знищено", description=f"Ви відмовились від каналу.", color=Color.default))
else:
await ctx.defer(ephemeral=True)
await ctx.respond(embed=Embed(title="Помилка виконання", description=f"У вас немає особистого каналу.", color=Color.fail))

6
enums/colors.py Normal file
View File

@ -0,0 +1,6 @@
from enum import IntEnum
class Color(IntEnum):
fail = 0xd6345b
success = 0x84d961
default = 0xa7a6ab

View File

@ -6,6 +6,9 @@ from discord import Activity, ActivityType
from modules.scheduled import scheduler from modules.scheduled import scheduler
from modules.client import client from modules.client import client
from modules.utils import config_get from modules.utils import config_get
from modules.utils_sync import config_get_sync
from cogs.custom_channels import CustomChannels
logging.basicConfig( logging.basicConfig(
level=logging.INFO, level=logging.INFO,
@ -30,9 +33,11 @@ async def on_ready():
await client.change_presence(activity=Activity(type=ActivityType.listening, name=await config_get("status"))) await client.change_presence(activity=Activity(type=ActivityType.listening, name=await config_get("status")))
def main(): def main():
client.add_cog(CustomChannels(client))
try: try:
scheduler.start() scheduler.start()
client.run(asyncio.run(config_get("token"))) client.run(config_get_sync("token"))
except KeyboardInterrupt: except KeyboardInterrupt:
scheduler.shutdown() scheduler.shutdown()
exit() exit()

30
modules/database.py Normal file
View File

@ -0,0 +1,30 @@
from pymongo import MongoClient
from ujson import loads
with open("config.json", "r", encoding="utf-8") as f:
db_config = loads(f.read())["database"]
f.close()
db_client = MongoClient(
'mongodb://{0}:{1}@{2}:{3}/{4}'.format(
db_config["user"],
db_config["password"],
db_config["host"],
db_config["port"],
db_config["name"]
)
)
db = db_client.get_database(name=db_config["name"])
collections = db.list_collection_names()
for collection in ["users", "warnings", "scheduler"]:
if not collection in collections:
db.create_collection(collection)
col_users = db.get_collection("users")
col_warnings = db.get_collection("warnings")
# col_checkouts = db.get_collection("checkouts")
# col_trackings = db.get_collection("trackings")
# col_authorized = db.get_collection("authorized")
# col_transactions = db.get_collection("transactions")

View File

@ -1,6 +1,7 @@
from typing import Any from typing import Any
import aiofiles import aiofiles
from ujson import loads, dumps from ujson import dumps, loads
async def json_read(path: str) -> Any: async def json_read(path: str) -> Any:

42
modules/utils_sync.py Normal file
View File

@ -0,0 +1,42 @@
from typing import Any
from discord import Member
from ujson import dumps, loads
def json_read_sync(path: str) -> Any:
with open(path, mode="r", encoding="utf-8") as f:
data = f.read()
return loads(data)
def json_write_sync(data: Any, path: str) -> None:
with open(path, mode="w", encoding="utf-8") as f:
f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False))
def config_get_sync(key: str, *path: str) -> Any:
this_key = json_read_sync("config.json")
for dict_key in path:
this_key = this_key[dict_key]
return this_key[key]
def config_set_sync(key: str, value: Any, *path: str) -> None:
this_dict = json_read_sync("config.json")
string = "this_dict"
for arg in path:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
json_write_sync(this_dict, "config.json")
return
def guild_name(member: Member):
if member.nick == None:
return member.name
else:
return member.nick