from gzip import open as gzipopen from datetime import datetime from os import listdir, makedirs, remove, stat, path from shutil import copyfileobj from typing import Any, Literal, Union from discord import ( utils, Embed, TextChannel, VoiceChannel, PermissionOverwrite, Guild, CategoryChannel, Member, Client, ApplicationContext, Message, ) try: from ujson import loads, dumps except ImportError: from json import loads, dumps # path = Path(__file__).resolve().parent log_size = 512 # This is the default option for "debug" key in # file config.json, so if cebug is not set in it # bot will use this value instead. debug = False try: with open("config.json", "r", encoding="utf-8") as json_file: output = loads(json_file.read()) json_file.close() debug = output["debug"] except: debug = debug # Check latest log size def checkSize(): try: if not path.isdir("logs"): makedirs("logs", exist_ok=True) log = stat("logs/latest.log") global log_size if (log.st_size / 1024) > log_size: with open("logs/latest.log", "rb") as f_in: with gzipopen( f'logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz', "wb" ) as f_out: copyfileobj(f_in, f_out) print( f'Copied logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz' ) open("logs/latest.log", "w").close() except FileNotFoundError: print("Not found") pass # Append string to log def appendLog(message: str, guild: Union[Guild, None] = None, announce=True) -> None: global debug if guild == None: message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] {message}' else: if debug: message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] [{guild} | {str(guild.id)}] {message}' else: message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] [{guild}] {message}' if announce: print(message_formatted, flush=True) checkSize() log = open("logs/latest.log", "a") # type: ignore log.write(f"{message_formatted}\n") log.close() def saveJson(value: Any, filename: str) -> None: with open(filename, "w", encoding="utf-8") as f: f.write(dumps(value, indent=4, ensure_ascii=False)) f.close() def loadJson(filename: str) -> Any: global debug try: with open(filename, "r", encoding="utf-8") as json_file: output = loads(json_file.read()) json_file.close() except Exception as exp: if debug: appendLog(f"Could not load json file {filename} due to exception {exp}") output = {} return output def colorToStr(): pass def strToColor(string: str) -> int: return int(hex(int(string.replace("#", ""), 16)), 0) def gotCommand(message: Message) -> None: global debug if debug: appendLog( f"Command '{message.content}' from {message.author} ({str(message.author.id)})", message.guild, ) else: appendLog(f"Command '{message.content}' from {message.author}", message.guild) def guildConfGet(guild: Guild, variable: str) -> Any: global debug try: config = loadJson(f"guilds/{str(guild.id)}/config.json") return config[variable] except Exception as exp: if debug: appendLog( f"Could not get guild config key '{variable}' due to {exp}", guild ) return None def guildConfSet(guild: Guild, variable: str, value: Any) -> None: config = loadJson(f"guilds/{str(guild.id)}/config.json") config[variable] = value try: saveJson(config, f"guilds/{str(guild.id)}/config.json") except: makedirs(f"guilds/{str(guild.id)}", exist_ok=True) makedirs(f"guilds/{str(guild.id)}/channels", exist_ok=True) saveJson(config, f"guilds/{str(guild.id)}/config.json") appendLog(f"Guild config key '{variable}' is now set to '{value}'", guild) def guildConfReset(guild: Guild, variable: str) -> None: try: config = loadJson(f"guilds/{str(guild.id)}/config.json") del config[variable] try: saveJson(config, f"guilds/{str(guild.id)}/config.json") except: makedirs(f"guilds/{str(guild.id)}", exist_ok=True) makedirs(f"guilds/{str(guild.id)}/channels", exist_ok=True) saveJson(config, f"guilds/{str(guild.id)}/config.json") appendLog(f"Guild config key '{variable}' has been reset", guild) except Exception as exp: appendLog(f"Could not reset guild config key '{variable}' due to {exp}", guild) def guildLocaleGet(guild: Guild) -> str: config = loadJson(f"config.json") try: locale = guildConfGet(guild, "locale") except: return config["bot_locale"] if locale is None: return config["bot_locale"] else: return locale def getMsg(string: str, guild: Union[Guild, None] = None) -> str: try: locale = loadJson(f"locale/{guildLocaleGet(guild)}.json") return locale["messages"][string] except Exception as exp: appendLog( f"Could not get locale string named {string} due to exception {exp}", guild ) return string def makeEmbed(title="", description="", footer="", color=0xFFFFFF) -> Embed: embed = Embed(title=title, description=description, color=color) if footer is not None: embed.set_footer(text=footer) return embed def channelExists( number: int, guild: Guild, type: Literal["Any", "Text", "Voice"] = "Any" ) -> bool: global debug if number == None: return False try: if type == "Voice": selected_channel = utils.get(guild.channels, id=number) if isinstance(selected_channel, VoiceChannel): return True elif type == "Text": selected_channel = utils.get(guild.channels, id=number) if isinstance(selected_channel, TextChannel): return True elif type == "Any": selected_channel = utils.get(guild.channels, id=number) return True except Exception as exp: if debug: appendLog(f"Channel ID {str(number)} is not a channel due to {exp}") return False def channelGetName(number: int, guild: Guild): global debug try: selected_channel = utils.get(guild.channels, id=number) return selected_channel.name except Exception as exp: if debug: appendLog(f"Channel ID {str(number)} is not a channel due to {exp}") return "Channel doesn't exist" def isUserVoice(vc: VoiceChannel) -> bool: try: channels_list = listdir(f"guilds/{str(vc.guild.id)}/channels/") if f"{str(vc.id)}.json" in channels_list: return True else: return False except: return False async def removeUserVoice(vc: VoiceChannel) -> None: global debug channels_list = listdir(f"guilds/{str(vc.guild.id)}/channels/") if f"{vc.id}.json" in channels_list: vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" vc_conf = loadJson(vc_file) needed_channel = utils.get(vc.guild.channels, id=vc.id) if loadJson("config.json")["enable_nomic"]: nomic_channel = utils.get(vc.guild.channels, id=vc_conf["nomic"]) remove(vc_file) await needed_channel.delete() if debug: appendLog( f"Removed voice channel '{needed_channel}' ({str(needed_channel.id)}) of user with id {str(vc_conf['ownerid'])}", guild=vc.guild, ) else: appendLog( f"Removed voice channel '{needed_channel}' of user with id {str(vc_conf['ownerid'])}", guild=vc.guild, ) if loadJson("config.json")["enable_nomic"]: await nomic_channel.delete() if debug: appendLog( f"Removed nomic channel {nomic_channel} ({str(nomic_channel.id)}) of channel with id {str(needed_channel.id)}", guild=vc.guild, ) else: appendLog( f"Removed nomic channel '{nomic_channel}' of channel with id {str(needed_channel.id)}", guild=vc.guild, ) else: return async def createUserVoice( vc: VoiceChannel, category: CategoryChannel, member: Member ) -> VoiceChannel: global debug chan = {} overwrites_channel = { vc.guild.default_role: PermissionOverwrite(view_channel=True), vc.guild.me: PermissionOverwrite( read_messages=True, view_channel=True, manage_channels=True ), member: PermissionOverwrite( read_messages=True, view_channel=True, manage_channels=True ), } overwrites_nomic = { vc.guild.default_role: PermissionOverwrite( view_channel=False, read_messages=False ), vc.guild.me: PermissionOverwrite( read_messages=True, view_channel=True, manage_channels=True ), member: PermissionOverwrite( read_messages=True, view_channel=True, manage_channels=True ), } created_channel = await vc.guild.create_voice_channel( getMsg("name_voice", vc.guild).format(member.name), category=category, overwrites=overwrites_channel, ) if debug: appendLog( f"Created voice channel '{created_channel}' ({str(created_channel.id)}) for user {member} ({str(member.id)})", guild=vc.guild, ) else: appendLog( f"Created voice channel '{created_channel}' for user {member}", guild=vc.guild, ) if not path.isdir(f"guilds/{str(created_channel.guild.id)}/channels"): makedirs(f"guilds/{str(created_channel.guild.id)}/channels", exist_ok=True) vc_file = f"guilds/{str(created_channel.guild.id)}/channels/{str(created_channel.id)}.json" chan["ownerid"] = member.id saveJson(chan, vc_file) if loadJson("config.json")["enable_nomic"]: nomic_channel = await vc.guild.create_text_channel( getMsg("name_nomic", vc.guild).format(created_channel.id), category=category, overwrites=overwrites_nomic, topic=getMsg("description_nomic", vc.guild).format(str(created_channel.id)), ) if debug: appendLog( f"Created nomic channel '{nomic_channel}' ({str(nomic_channel.id)}) for channel '{created_channel}' ({str(created_channel.id)})", guild=vc.guild, ) else: appendLog( f"Created nomic channel '{nomic_channel}' for channel '{created_channel}'", guild=vc.guild, ) chan["nomic"] = nomic_channel.id saveJson(chan, vc_file) return created_channel def isVoiceOfUser(vc: VoiceChannel, member: Member) -> bool: vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" vc_conf = loadJson(vc_file) if vc_conf["ownerid"] == member.id: return True else: return False async def changeNomicPerms( mode: Literal["deny", "allow"], vc: VoiceChannel, member: Member ) -> None: vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" vc_conf = loadJson(vc_file) if loadJson("config.json")["enable_nomic"]: nomic_channel = utils.get(vc.guild.channels, id=vc_conf["nomic"]) if mode == "deny": await nomic_channel.set_permissions(member, view_channel=False) else: await nomic_channel.set_permissions(member, view_channel=True) async def clearTrash(client: Client): if not path.isdir(f"guilds/"): makedirs(f"guilds", exist_ok=True) guilds_list = listdir(f"guilds/") for guild in guilds_list: guild_object = client.get_guild(int(guild)) if path.isdir(f"guilds/{guild}/channels"): channels_list = listdir(f"guilds/{guild}/channels/") for channel in channels_list: channel_id = channel[:-5] try: selected_channel = utils.get( guild_object.voice_channels, id=int(channel_id) ) channel_owner = loadJson(f"guilds/{guild}/channels/{channel}")[ "ownerid" ] remove_channel = True for member in selected_channel.members: if member.id == channel_owner: remove_channel = False if remove_channel: await removeUserVoice(selected_channel) except: remove(f"guilds/{guild}/channels/{channel_id}.json") # async def autoClearTrash(client): # execute clearTrash every 120 seconds def getHelpMessage(ctx: ApplicationContext, version: float) -> Embed: # channelExists(number, guild, type="Voice") config = loadJson("config.json") if ctx.guild is not None: if channelExists(guildConfGet(ctx.guild, "channel"), ctx.guild, type="Voice"): desc_channel = getMsg("help_channel_set", guild=ctx.guild).format( channelGetName(guildConfGet(ctx.guild, "channel"), ctx.guild) ) else: desc_channel = getMsg("help_channel_none", guild=ctx.guild) if channelExists(guildConfGet(ctx.guild, "category"), ctx.guild, type="Any"): desc_category = getMsg("help_category_set", guild=ctx.guild).format( channelGetName(guildConfGet(ctx.guild, "category"), ctx.guild) ) else: desc_category = getMsg("help_category_none", guild=ctx.guild) desc_locale = getMsg("help_locale", guild=ctx.guild).format( getMsg("locale_name", ctx.guild) ) description = "\n".join([desc_locale, desc_channel, desc_category]) embed = Embed( title=getMsg("help_title", ctx.guild), description=description, color=strToColor(config["color_default"]), ) else: embed = Embed( title=getMsg("help_title_dm", ctx.guild), color=strToColor(config["color_default"]), ) embed.set_author( name=f'{config["bot_name"]} v{str(version)}', url=config["bot_site"], icon_url=config["bot_icon"], ) if ctx.author.id == config["owner"]: embed.add_field( name=f"/shutdown", value=getMsg("help_cmd_shutdown", ctx.guild), inline=False, ) embed.add_field( name=f"/channel set", value=getMsg("help_cmd_channel", ctx.guild), inline=False ) embed.add_field( name=f"/category set", value=getMsg("help_cmd_category", ctx.guild), inline=False, ) embed.add_field( name=f"/locale set", value=getMsg("help_cmd_locale", ctx.guild), inline=False ) if ctx.guild is None: embed.set_footer(text=getMsg("help_server", ctx.guild)) else: embed.set_footer(text=getMsg("help_notice_id", ctx.guild)) return embed async def guildConfigured(guild: Guild) -> str: output = {} config = loadJson("config.json") for kind in ["channel", "category"]: if guildConfGet(guild, kind) is not None: try: guild_object = utils.get(guild.categories, id=guildConfGet(guild, kind)) output[kind] = getMsg("configured_" + kind, guild).format( guild_object.name ) except Exception as exp: output[kind] = getMsg("unconfigured_" + kind, guild) else: output[kind] = getMsg("unconfigured_" + kind, guild) return getMsg("server_config", guild).format( getMsg("info_locale", guild).format(getMsg("locale_name", guild)), output["channel"], output["category"], )