diff --git a/classes/holo_user.py b/classes/holo_user.py new file mode 100644 index 0000000..2c2fc58 --- /dev/null +++ b/classes/holo_user.py @@ -0,0 +1,220 @@ +from datetime import datetime, timezone +from random import randint +from typing import Union, Any +import discord +import discord.member +from modules.utils import config_get +from modules.database import col_users, col_warnings + +try: + from typing import Literal +except ImportError: + from typing_extensions import Literal + +class NotEnoughMoneyError(Exception): + """User does not have enough money to do that""" + pass + +class UserNotFoundError(Exception): + """HoloUser could not find user with such an ID in database""" + def __init__(self, user, user_id): + self.user = user + self.user_id = user_id + super().__init__(f"User of type {type(self.user)} with id {self.user_id} was not found") + +class HoloUser(): + + def __init__(self, user: Union[discord.User, discord.Member, discord.member.Member, int]) -> None: + """Get an object that has a proper binding between Discord ID and database + + ### Args: + * `user` (Union[discord.User, discord.Member, discord.member.Member, int]): Object from which ID can be extracted + + ### Raises: + * `UserNotFoundError`: User with such ID does not seem to exist in database + """ + + if hasattr(user, "id"): + self.id = user.id # type: ignore + else: + self.id = user + + jav_user = col_users.find_one({"user": self.id}) + + if jav_user is None: + raise UserNotFoundError(user=user, user_id=self.id) + + self.db_id = jav_user["_id"] + + # self.xp = jav_user["xp"] + # self.xp_next = jav_user["xp_next"] + # self.level = jav_user["level"] + # self.work_xp = jav_user["work_xp"] + # self.balance = jav_user["balance"] + self.customrole = jav_user["customrole"] + self.customchannel = jav_user["customchannel"] + # self.married = jav_user["married"] + # self.marriage_request = jav_user["marriage_request"] + # self.marriage_request_sent = jav_user["marriage_request_sent"] + # self.cooldown = jav_user["cooldown"] + self.warnings = self.warns() + + # def xp_add(self, amount: int = 1) -> None: + # """Add some amount of XP points + + # ### Args: + # * `amount` (int, optional): Amount of XP points to give. Defaults to 1. + # """ + # self.xp += amount + # col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "xp": self.xp } }) + + # def xp_level_up(self) -> None: + # """Add 1 to the current XP level""" + # xp_diff = int(self.xp - self.xp_next) + # xp_next = int(self.xp_next*configGet("multiplier", "leveling")+configGet("addition", "leveling")) + # self.xp = xp_diff + # self.xp_next = xp_next + # self.level += 1 + # col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "xp": xp_diff } }) + # col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "xp_next": xp_next } }) + # col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "level": self.level } }) + + # def balance_set(self, amount: int) -> None: + # """Set the balance to amount + + # ### Args: + # * `amount` (int): Amount of currency to be set + # """ + # self.balance = amount + # col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "balance": self.balance } }) + + # def balance_add(self, amount: int) -> None: + # """Add amount to the balance + + # ### Args: + # * `amount` (int): Amount to be added + # """ + # self.balance_set(self.balance+amount) + + # def balance_take(self, amount: int) -> bool: + # """Take amount from the balance + + # ### Args: + # * `amount` (int): Amount to be taken + + # ### Returns: + # * `bool`: True if successful and False if not + # """ + # if self.balance >= amount: + # self.balance_set(self.balance-amount) + # return True + # else: + # return False + # #raise NotEnoughMoneyError() + + # def balance_transfer(self, amount: int, destination: Union[Any, int]) -> None: + # """Transfer money to another user + + # ### Args: + # * `amount` (int): Amount to be transferred + # * `destination` (Union[Any, int]): Destination user of the transfer (should have attribute "id" or be id itself if int) + + # ### Raises: + # * `NotEnoughMoneyError`: Not enough money to perform this transaction + # """ + # if self.balance >= amount: + # if isinstance(destination, int): + # destination = HoloUser(destination) + # self.balance_take(amount) + # destination.balance_add(amount) # type: ignore + # else: + # raise NotEnoughMoneyError() + + # def salary_get(self) -> int: + # """Get the salary level depending on work_xp + + # ### Returns: + # * `int`: Amount of money to be earned + # """ + # if self.work_xp >= 100: + # return randint(configGet("min", "work", "level", "4"), configGet("max", "work", "level", "4")) + # elif self.work_xp >= 50: + # return randint(configGet("min", "work", "level", "3"), configGet("max", "work", "level", "3")) + # elif self.work_xp > 10: + # return randint(configGet("min", "work", "level", "2"), configGet("max", "work", "level", "2")) + # else: + # return randint(configGet("min", "work", "level", "1"), configGet("max", "work", "level", "1")) + + # def work_xp_add(self) -> None: + # self.set("work_xp", self.work_xp+1) + + def warns(self) -> int: + """Get number of warnings user has + + ### Returns: + * `int`: Number of warnings + """ + warns = col_warnings.find_one({"user": self.id}) + if warns == None: + return 0 + else: + return warns["warns"] + + def warn(self, count=1, reason: str = "Not provided") -> None: + """Warn and add count to warns number + + ### Args: + * `count` (int, optional): Count of warnings to be added. Defaults to 1. + """ + warns = col_warnings.find_one({"user": self.id}) + if warns != None: + col_warnings.update_one(filter={"_id": self.db_id}, update={ "$set": { "warns": warns["warns"]+count } }) + else: + col_warnings.insert_one(document={ "user": self.id, "warns": count }) + logWrite(f"User {self.id} was warned {count} times due to: {reason}") + + # def cooldown_go(self, kind: Literal["work", "daily", "weekly", "monthly", "steal"]) -> None: + # """Set cooldown start of kind now + + # ### Args: + # * `kind` (Literal["work", "daily", "weekly", "monthly", "steal"]): Kind of a cooldown + # """ + # self.cooldown[kind] = datetime.now(tz=timezone.utc) + # col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { "cooldown": self.cooldown } }) + + def set(self, key: str, value: Any) -> None: + """Set attribute data and save it into database + + ### Args: + * `key` (str): Attribute to be changed + * `value` (Any): Value to set + """ + if not hasattr(self, key): + raise AttributeError() + setattr(self, key, value) + col_users.update_one(filter={"_id": self.db_id}, update={ "$set": { key: value } }, upsert=True) + logWrite(f"Set attribute {key} of user {self.id} to {value}") + + def purge(self) -> None: + """Completely remove data from database. Will not remove transactions logs and warnings.""" + col_users.delete_one(filter={"_id": self.db_id}) + self.unauthorize() + + def unauthorize(self) -> None: + """Cancel Oauth2 authorization""" + col_authorized.find_one_and_delete({"user": self.id}) + + # def is_authorized(self) -> bool: + # """Check if user provided Oauth2 authorization + + # ### Returns: + # * `bool`: True if yes and False if no + # """ + # if configGet("mode") == "secure": + # authorized = col_authorized.find_one({"user": self.id}) + # if authorized is not None: + # return True + # else: + # return False + # else: + # return True \ No newline at end of file diff --git a/cogs/custom_channels.py b/cogs/custom_channels.py new file mode 100644 index 0000000..5d255f8 --- /dev/null +++ b/cogs/custom_channels.py @@ -0,0 +1,75 @@ +from discord import ApplicationContext, option, utils, Embed +from discord.ext import commands +from discord.commands import SlashCommandGroup +from classes.holo_user import HoloUser +from enums.colors import Color +from modules.utils import config_get +from modules.utils_sync import config_get_sync, guild_name + +class CustomChannels(commands.Cog): + + def __init__(self, client): + self.client = client + + customchannel = SlashCommandGroup("customchannel", "Керування особистим каналом") # discord.create_group("customrole", description="Керування особистою роллю") + + @customchannel.command(name="buy", description="Отримати персональний текстовий канал", guild_ids=config_get_sync("guilds")) + @option("name", description="Назва каналу") + @option("reactions", description="Дозволити реакції") + @option("threads", description="Дозволити гілки") + async def customchannel_buy_cmd(self, ctx: ApplicationContext, name: str, reactions: bool, threads: bool): + + holo_user_ctx = HoloUser(ctx.user) + + if holo_user_ctx.customchannel == None: + await ctx.defer() + created_channel = await ctx.user.guild.create_text_channel(name=name, + reason=f"Користувач {guild_name(ctx.user)} купив канал", + category=utils.get(ctx.author.guild.categories, id=await config_get("customchannel", "categories")), + ) + await created_channel.set_permissions(ctx.user.guild.default_role, send_messages=False, add_reactions=reactions, create_public_threads=threads, create_private_threads=threads) + await created_channel.set_permissions(ctx.user, attach_files=True, manage_messages=True, send_messages=True, embed_links=True, manage_channels=True) + # col_users.update_one(filter={"_id": holo_user_ctx.db_id}, update={ "$set": { "customchannel": created_channel.id } }) + holo_user_ctx.set("customchannel", created_channel.id) + await ctx.respond(embed=Embed(title="Створено канал", description=f"Вітаємо! Ви створили канал {created_channel.mention}. Для керування ним користуйтесь меню налаштувань каналу а також командою `/customchannel edit`", color=Color.success)) + bots = await config_get("bots") + for bot in bots: + await created_channel.set_permissions(utils.get(ctx.user.guild.roles, id=bots[bot]["role"]), view_channel=False) + else: + await ctx.defer(ephemeral=True) + await ctx.respond(embed=Embed(title="Помилка виконання", description=f"У вас вже є особистий канал.\nДля редагування каналу є `/customchannel edit` або просто відкрийте меню керування вашим каналом.", color=Color.fail)) + + @customchannel.command(name="edit", description="Змінити параметри особистого каналу", guild_ids=config_get_sync("guilds")) + @option("name", description="Назва каналу") + @option("reactions", description="Дозволити реакції") + @option("threads", description="Дозволити гілки") + async def customchannel_edit_cmd(self, ctx: ApplicationContext, name:str, reactions: bool, threads: bool): + + holo_user_ctx = HoloUser(ctx.user) + + custom_channel = utils.get(ctx.guild.channels, id=holo_user_ctx.customchannel) + if custom_channel is None: + await ctx.respond(embed=Embed(title="Канал не знайдено", description=f"Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", color=Color.fail)) + return + await custom_channel.edit(name=name) + await custom_channel.set_permissions(ctx.user.guild.default_role, send_messages=False, add_reactions=reactions, create_public_threads=threads, create_private_threads=threads) + await ctx.respond(embed=Embed(title="Канал змінено", description=f"Назва каналу тепер `{name}`, реакції `{reactions}` та дозволено треди `{threads}`", color=Color.fail)) + + @customchannel.command(name="refund", description="Відібрати канал, знищуючи його, та частково повернути кошти", guild_ids=config_get_sync("guilds")) + async def customchannel_refund_cmd(self, ctx: ApplicationContext): + + holo_user_ctx = HoloUser(ctx.user) + + if holo_user_ctx.customchannel is not None: + await ctx.defer() + custom_channel = utils.get(ctx.guild.channels, id=holo_user_ctx.customchannel) + if custom_channel is None: + await ctx.respond(embed=Embed(title="Канал не знайдено", description=f"Канал, вказаний як ваш, не існує. Можливо, його було вручну видалено раніше.", color=Color.fail)) + holo_user_ctx.set("customchannel", None) + return + await custom_channel.delete(reason="Повернення коштів") + holo_user_ctx.set("customchannel", None) + await ctx.respond(embed=Embed(title="Канал знищено", description=f"Ви відмовились від каналу.", color=Color.default)) + else: + await ctx.defer(ephemeral=True) + await ctx.respond(embed=Embed(title="Помилка виконання", description=f"У вас немає особистого каналу.", color=Color.fail)) diff --git a/enums/colors.py b/enums/colors.py new file mode 100644 index 0000000..8f647b2 --- /dev/null +++ b/enums/colors.py @@ -0,0 +1,6 @@ +from enum import IntEnum + +class Color(IntEnum): + fail = 0xd6345b + success = 0x84d961 + default = 0xa7a6ab \ No newline at end of file diff --git a/main.py b/main.py index 673aff9..9bb782c 100644 --- a/main.py +++ b/main.py @@ -6,6 +6,9 @@ from discord import Activity, ActivityType from modules.scheduled import scheduler from modules.client import client from modules.utils import config_get +from modules.utils_sync import config_get_sync + +from cogs.custom_channels import CustomChannels logging.basicConfig( level=logging.INFO, @@ -30,9 +33,11 @@ async def on_ready(): await client.change_presence(activity=Activity(type=ActivityType.listening, name=await config_get("status"))) def main(): + client.add_cog(CustomChannels(client)) + try: scheduler.start() - client.run(asyncio.run(config_get("token"))) + client.run(config_get_sync("token")) except KeyboardInterrupt: scheduler.shutdown() exit() diff --git a/modules/database.py b/modules/database.py new file mode 100644 index 0000000..c64d0d4 --- /dev/null +++ b/modules/database.py @@ -0,0 +1,30 @@ +from pymongo import MongoClient +from ujson import loads + +with open("config.json", "r", encoding="utf-8") as f: + db_config = loads(f.read())["database"] + f.close() + +db_client = MongoClient( + 'mongodb://{0}:{1}@{2}:{3}/{4}'.format( + db_config["user"], + db_config["password"], + db_config["host"], + db_config["port"], + db_config["name"] + ) +) +db = db_client.get_database(name=db_config["name"]) + +collections = db.list_collection_names() + +for collection in ["users", "warnings", "scheduler"]: + if not collection in collections: + db.create_collection(collection) + +col_users = db.get_collection("users") +col_warnings = db.get_collection("warnings") +# col_checkouts = db.get_collection("checkouts") +# col_trackings = db.get_collection("trackings") +# col_authorized = db.get_collection("authorized") +# col_transactions = db.get_collection("transactions") \ No newline at end of file diff --git a/modules/utils.py b/modules/utils.py index d28a6d2..8fb08b2 100644 --- a/modules/utils.py +++ b/modules/utils.py @@ -1,6 +1,7 @@ from typing import Any + import aiofiles -from ujson import loads, dumps +from ujson import dumps, loads async def json_read(path: str) -> Any: diff --git a/modules/utils_sync.py b/modules/utils_sync.py new file mode 100644 index 0000000..6952d1b --- /dev/null +++ b/modules/utils_sync.py @@ -0,0 +1,42 @@ +from typing import Any + +from discord import Member +from ujson import dumps, loads + + +def json_read_sync(path: str) -> Any: + with open(path, mode="r", encoding="utf-8") as f: + data = f.read() + return loads(data) + + +def json_write_sync(data: Any, path: str) -> None: + with open(path, mode="w", encoding="utf-8") as f: + f.write(dumps(data, ensure_ascii=False, escape_forward_slashes=False)) + + +def config_get_sync(key: str, *path: str) -> Any: + this_key = json_read_sync("config.json") + for dict_key in path: + this_key = this_key[dict_key] + return this_key[key] + + +def config_set_sync(key: str, value: Any, *path: str) -> None: + this_dict = json_read_sync("config.json") + string = "this_dict" + for arg in path: + string += f'["{arg}"]' + if type(value) in [str]: + string += f'["{key}"] = "{value}"' + else: + string += f'["{key}"] = {value}' + exec(string) + json_write_sync(this_dict, "config.json") + return + +def guild_name(member: Member): + if member.nick == None: + return member.name + else: + return member.nick \ No newline at end of file