From a3a0aaa72b2be510eb75d83d2c01c12a129171d2 Mon Sep 17 00:00:00 2001 From: profitroll Date: Tue, 17 Jan 2023 15:05:39 +0100 Subject: [PATCH] Tasty refactor --- functions.py | 196 +++++++++++++++++++++++---------------------------- yusarin.py | 46 ++++++------ 2 files changed, 111 insertions(+), 131 deletions(-) diff --git a/functions.py b/functions.py index 3ec5635..2d3d3ac 100644 --- a/functions.py +++ b/functions.py @@ -1,14 +1,12 @@ -import gzip -import os -import sys -import json -import shutil -import discord # type: ignore - +from json import loads, dumps +from gzip import open as gzipopen from datetime import datetime -from pathlib import Path +from os import listdir, makedirs, remove, stat, path +from shutil import copyfileobj +from typing import Any, Literal, Union +from discord import utils, Embed, TextChannel, VoiceChannel, PermissionOverwrite, Guild, CategoryChannel, Member, Client, ApplicationContext, Message -path = Path(__file__).resolve().parent +# path = Path(__file__).resolve().parent log_size = 512 @@ -19,7 +17,7 @@ debug = False try: with open("config.json", 'r', encoding="utf-8") as json_file: - output = json.load(json_file) + output = loads(json_file.read()) json_file.close() debug = output["debug"] except: @@ -27,30 +25,28 @@ except: # Check latest log size def checkSize(): - global path - path = str(path) try: - if not os.path.isdir(f"{path}/logs"): - os.mkdir(f"{path}/logs") + if not path.isdir("logs"): + makedirs("logs", exist_ok=True) - log = os.stat(path + '/logs/latest.log') + log = stat("logs/latest.log") global log_size if (log.st_size / 1024) > log_size: - with open(path + '/logs/latest.log', 'rb') as f_in: - with gzip.open(f'{path}/logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz', 'wb') as f_out: - shutil.copyfileobj(f_in, f_out) - print(f'Copied {path}/logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz') - open(path + '/logs/latest.log', 'w').close() + with open("logs/latest.log", "rb") as f_in: + with gzipopen(f'logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz', 'wb') as f_out: + copyfileobj(f_in, f_out) + print(f'Copied logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz') + open("logs/latest.log", "w").close() except FileNotFoundError: print('Not found') pass # Append string to log -def appendLog(message, guild=None, announce=True): +def appendLog(message: str, guild: Union[Guild, None] = None, announce=True) -> None: global debug - global path + if guild == None: message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] {message}' @@ -65,20 +61,20 @@ def appendLog(message, guild=None, announce=True): checkSize() - log = open(path + '/logs/latest.log', 'a') # type: ignore + log = open('logs/latest.log', 'a') # type: ignore log.write(f'{message_formatted}\n') log.close() -def saveJson(value, filename): +def saveJson(value: Any, filename: str) -> None: with open(filename, 'w', encoding="utf-8") as f: - json.dump(value, f, indent=4, ensure_ascii=False) + f.write(dumps(value, indent=4, ensure_ascii=False)) f.close() -def loadJson(filename): +def loadJson(filename: str) -> Any: global debug try: with open(filename, 'r', encoding="utf-8") as json_file: - output = json.load(json_file) + output = loads(json_file.read()) json_file.close() except Exception as exp: if debug: @@ -89,57 +85,53 @@ def loadJson(filename): def colorToStr(): pass -def strToColor(string): +def strToColor(string: str) -> int: return int(hex(int(string.replace("#", ""), 16)), 0) -def gotCommand(message): +def gotCommand(message: Message) -> None: global debug if debug: appendLog(f"Command '{message.content}' from {message.author} ({str(message.author.id)})", message.guild) else: appendLog(f"Command '{message.content}' from {message.author}", message.guild) -def guildConfGet(guild, variable): - global path +def guildConfGet(guild: Guild, variable: str) -> Any: global debug try: - config = loadJson(f"{path}/guilds/{str(guild.id)}/config.json") + config = loadJson(f"guilds/{str(guild.id)}/config.json") return config[variable] except Exception as exp: if debug: appendLog(f"Could not get guild config key '{variable}' due to {exp}", guild) return None -def guildConfSet(guild, variable, value): - global path - config = loadJson(f"{path}/guilds/{str(guild.id)}/config.json") +def guildConfSet(guild: Guild, variable: str, value: Any) -> None: + config = loadJson(f"guilds/{str(guild.id)}/config.json") config[variable] = value try: - saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json") + saveJson(config, f"guilds/{str(guild.id)}/config.json") except: - os.mkdir(f"{path}/guilds/{str(guild.id)}") - os.mkdir(f"{path}/guilds/{str(guild.id)}/channels") - saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json") + makedirs(f"guilds/{str(guild.id)}", exist_ok=True) + makedirs(f"guilds/{str(guild.id)}/channels", exist_ok=True) + saveJson(config, f"guilds/{str(guild.id)}/config.json") appendLog(f"Guild config key '{variable}' is now set to '{value}'", guild) -def guildConfReset(guild, variable): - global path +def guildConfReset(guild: Guild, variable: str) -> None: try: - config = loadJson(f"{path}/guilds/{str(guild.id)}/config.json") + config = loadJson(f"guilds/{str(guild.id)}/config.json") del config[variable] try: - saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json") + saveJson(config, f"guilds/{str(guild.id)}/config.json") except: - os.mkdir(f"{path}/guilds/{str(guild.id)}") - os.mkdir(f"{path}/guilds/{str(guild.id)}/channels") - saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json") + makedirs(f"guilds/{str(guild.id)}", exist_ok=True) + makedirs(f"guilds/{str(guild.id)}/channels", exist_ok=True) + saveJson(config, f"guilds/{str(guild.id)}/config.json") appendLog(f"Guild config key '{variable}' has been reset", guild) except Exception as exp: appendLog(f"Could not reset guild config key '{variable}' due to {exp}", guild) -def guildLocaleGet(guild): - global path - config = loadJson(f"{path}/config.json") +def guildLocaleGet(guild: Guild) -> str: + config = loadJson(f"config.json") try: locale = guildConfGet(guild, "locale") except: @@ -149,57 +141,54 @@ def guildLocaleGet(guild): else: return locale -def getMsg(string, guild=None): - global path - config = loadJson("config.json") +def getMsg(string: str, guild: Union[Guild, None] = None) -> str: try: - locale = loadJson(f'{path}/locale/{guildLocaleGet(guild)}.json') + locale = loadJson(f'locale/{guildLocaleGet(guild)}.json') return locale["messages"][string] except Exception as exp: appendLog(f"Could not get locale string named {string} due to exception {exp}", guild) return string -def makeEmbed(title="", description="", footer="", color=0xffffff): - embed=discord.Embed(title=title, description=description, color=color) +def makeEmbed(title="", description="", footer="", color=0xffffff) -> Embed: + embed=Embed(title=title, description=description, color=color) if footer is not None: embed.set_footer(text=footer) return embed -def channelExists(number, guild, type="Any"): +def channelExists(number: int, guild: Guild, type: Literal["Any", "Text", "Voice"] = "Any") -> bool: global debug if number == None: return False try: if type == "Voice": - selected_channel = discord.utils.get(guild.channels, id=number) - if isinstance(selected_channel, discord.VoiceChannel): + selected_channel = utils.get(guild.channels, id=number) + if isinstance(selected_channel, VoiceChannel): return True elif type == "Text": - selected_channel = discord.utils.get(guild.channels, id=number) - if isinstance(selected_channel, discord.TextChannel): + selected_channel = utils.get(guild.channels, id=number) + if isinstance(selected_channel, TextChannel): return True elif type == "Any": - selected_channel = discord.utils.get(guild.channels, id=number) + selected_channel = utils.get(guild.channels, id=number) return True except Exception as exp: if debug: appendLog(f"Channel ID {str(number)} is not a channel due to {exp}") return False -def channelGetName(number, guild): +def channelGetName(number: int, guild: Guild): global debug try: - selected_channel = discord.utils.get(guild.channels, id=number) + selected_channel = utils.get(guild.channels, id=number) return selected_channel.name except Exception as exp: if debug: appendLog(f"Channel ID {str(number)} is not a channel due to {exp}") return "Channel doesn't exist" -def isUserVoice(vc): - global path +def isUserVoice(vc: VoiceChannel) -> bool: try: - channels_list = os.listdir(f"{path}/guilds/{str(vc.guild.id)}/channels/") + channels_list = listdir(f"guilds/{str(vc.guild.id)}/channels/") if f"{str(vc.id)}.json" in channels_list: return True else: @@ -207,19 +196,18 @@ def isUserVoice(vc): except: return False -async def removeUserVoice(vc): - global path +async def removeUserVoice(vc: VoiceChannel) -> None: global debug - channels_list = os.listdir(f"{path}/guilds/{str(vc.guild.id)}/channels/") + channels_list = listdir(f"guilds/{str(vc.guild.id)}/channels/") if f"{vc.id}.json" in channels_list: - vc_file = f"{path}/guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" + vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" vc_conf = loadJson(vc_file) - needed_channel = discord.utils.get(vc.guild.channels, id=vc.id) + needed_channel = utils.get(vc.guild.channels, id=vc.id) if loadJson("config.json")["enable_nomic"]: - nomic_channel = discord.utils.get(vc.guild.channels, id=vc_conf["nomic"]) + nomic_channel = utils.get(vc.guild.channels, id=vc_conf["nomic"]) - os.remove(vc_file) + remove(vc_file) await needed_channel.delete() if debug: @@ -236,28 +224,27 @@ async def removeUserVoice(vc): else: return -async def createUserVoice(vc, category, member): - global path +async def createUserVoice(vc: VoiceChannel, category: CategoryChannel, member: Member) -> VoiceChannel: global debug chan = {} overwrites_channel = { - vc.guild.default_role: discord.PermissionOverwrite(view_channel=True), - vc.guild.me: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True), - member: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True) + vc.guild.default_role: PermissionOverwrite(view_channel=True), + vc.guild.me: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True), + member: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True) } overwrites_nomic = { - vc.guild.default_role: discord.PermissionOverwrite(view_channel=False, read_messages=False), - vc.guild.me: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True), - member: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True) + vc.guild.default_role: PermissionOverwrite(view_channel=False, read_messages=False), + vc.guild.me: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True), + member: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True) } created_channel = await vc.guild.create_voice_channel(getMsg("name_voice", vc.guild).format(member.name), category=category, overwrites=overwrites_channel) if debug: appendLog(f"Created voice channel '{created_channel}' ({str(created_channel.id)}) for user {member} ({str(member.id)})", guild=vc.guild) else: appendLog(f"Created voice channel '{created_channel}' for user {member}", guild=vc.guild) - if not os.path.isdir(f"{path}/guilds/{str(created_channel.guild.id)}/channels"): - os.mkdir(f"{path}/guilds/{str(created_channel.guild.id)}/channels") - vc_file = f"{path}/guilds/{str(created_channel.guild.id)}/channels/{str(created_channel.id)}.json" + if not path.isdir(f"guilds/{str(created_channel.guild.id)}/channels"): + makedirs(f"guilds/{str(created_channel.guild.id)}/channels", exist_ok=True) + vc_file = f"guilds/{str(created_channel.guild.id)}/channels/{str(created_channel.id)}.json" chan["ownerid"] = member.id saveJson(chan, vc_file) if loadJson("config.json")["enable_nomic"]: @@ -270,40 +257,37 @@ async def createUserVoice(vc, category, member): saveJson(chan, vc_file) return created_channel -def isVoiceOfUser(vc, member): - global path - vc_file = f"{path}/guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" +def isVoiceOfUser(vc: VoiceChannel, member: Member) -> bool: + vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" vc_conf = loadJson(vc_file) if vc_conf["ownerid"] == member.id: return True else: return False -async def changeNomicPerms(mode, vc, member): - global path - vc_file = f"{path}/guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" +async def changeNomicPerms(mode: Literal["deny", "allow"], vc: VoiceChannel, member: Member) -> None: + vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json" vc_conf = loadJson(vc_file) if loadJson("config.json")["enable_nomic"]: - nomic_channel = discord.utils.get(vc.guild.channels, id=vc_conf["nomic"]) + nomic_channel = utils.get(vc.guild.channels, id=vc_conf["nomic"]) if mode == "deny": await nomic_channel.set_permissions(member, view_channel=False) else: await nomic_channel.set_permissions(member, view_channel=True) -async def clearTrash(client): - global path - if not os.path.isdir(f"{path}/guilds/"): - os.mkdir(f"{path}/guilds") - guilds_list = os.listdir(f"{path}/guilds/") +async def clearTrash(client: Client): + if not path.isdir(f"guilds/"): + makedirs(f"guilds", exist_ok=True) + guilds_list = listdir(f"guilds/") for guild in guilds_list: guild_object = client.get_guild(int(guild)) - if os.path.isdir(f"{path}/guilds/{guild}/channels"): - channels_list = os.listdir(f"{path}/guilds/{guild}/channels/") + if path.isdir(f"guilds/{guild}/channels"): + channels_list = listdir(f"guilds/{guild}/channels/") for channel in channels_list: channel_id = channel[:-5] try: - selected_channel = discord.utils.get(guild_object.voice_channels, id=int(channel_id)) - channel_owner = loadJson(f"{path}/guilds/{guild}/channels/{channel}")["ownerid"] + selected_channel = utils.get(guild_object.voice_channels, id=int(channel_id)) + channel_owner = loadJson(f"guilds/{guild}/channels/{channel}")["ownerid"] remove_channel = True for member in selected_channel.members: if member.id == channel_owner: @@ -311,12 +295,12 @@ async def clearTrash(client): if remove_channel: await removeUserVoice(selected_channel) except: - os.remove(f"{path}/guilds/{guild}/channels/{channel_id}.json") + remove(f"guilds/{guild}/channels/{channel_id}.json") #async def autoClearTrash(client): # execute clearTrash every 120 seconds -def getHelpMessage(ctx, version): +def getHelpMessage(ctx: ApplicationContext, version: float) -> Embed: #channelExists(number, guild, type="Voice") @@ -337,9 +321,9 @@ def getHelpMessage(ctx, version): description = "\n".join([desc_locale, desc_channel, desc_category]) - embed=discord.Embed(title=getMsg("help_title", ctx.guild), description=description, color=strToColor(config["color_default"])) + embed=Embed(title=getMsg("help_title", ctx.guild), description=description, color=strToColor(config["color_default"])) else: - embed=discord.Embed(title=getMsg("help_title_dm", ctx.guild), color=strToColor(config["color_default"])) + embed=Embed(title=getMsg("help_title_dm", ctx.guild), color=strToColor(config["color_default"])) embed.set_author(name=f'{config["bot_name"]} v{str(version)}', url=config["bot_site"], icon_url=config["bot_icon"]) @@ -357,7 +341,7 @@ def getHelpMessage(ctx, version): return embed -async def guildConfigured(guild): +async def guildConfigured(guild: Guild) -> str: output = {} config = loadJson("config.json") @@ -365,7 +349,7 @@ async def guildConfigured(guild): for kind in ["channel", "category"]: if guildConfGet(guild, kind) is not None: try: - guild_object = discord.utils.get(guild.categories, id=guildConfGet(guild, kind)) + guild_object = utils.get(guild.categories, id=guildConfGet(guild, kind)) output[kind] = getMsg("configured_"+kind, guild).format(guild_object.name) except Exception as exp: output[kind] = getMsg("unconfigured_"+kind, guild) diff --git a/yusarin.py b/yusarin.py index 5cc5534..b6ff69f 100644 --- a/yusarin.py +++ b/yusarin.py @@ -1,28 +1,24 @@ -import os -import sys -import json -import shutil -import requests # type: ignore -import threading - try: - import discord # type: ignore - from discord import ApplicationContext, Option, Intents # type: ignore + from requests import get + from discord import ApplicationContext, Option, Intents, Bot, ActivityType, Activity except Exception as exp: - print(f"Module py-cord is not installed. Make sure to run 'pip install -r requirements.txt' before first start") - sys.exit() + print(f"Dependencies not installed. Make sure to run 'pip install -r requirements.txt' before first start") + exit() +from os import getpid, system +from shutil import rmtree from functions import * -pid = os.getpid() -version = 1.7 + +pid = getpid() +version = 1.8 if loadJson("config.json")["owner"] == "SET-OWNER-ID" or loadJson("config.json")["bot_token"] == "SET-BOT-TOKEN": print(f"Bot is not correctly configured.\nMake sure you've set up owner id and bot token in {path}/config.json\nLearn more here: https://git.end-play.xyz/profitroll/YusarinBot") - sys.exit() + exit() if loadJson("config.json")["check_for_updates"]: try: - serv_ver = json.loads(requests.get("https://api.end-play.xyz/version?app=yusarinbot&apikey=publickey").text)["version"] + serv_ver = loads(get("https://api.end-play.xyz/version?app=yusarinbot&apikey=publickey").text)["version"] if float(serv_ver) > version: appendLog(f"YusarinBot version {serv_ver} is available. Download new version here: https://git.end-play.xyz/profitroll/YusarinBot/releases/latest") appendLog(f"Currently using YusarinBot v{str(version)}") @@ -30,7 +26,7 @@ if loadJson("config.json")["check_for_updates"]: appendLog(f"Could not get YusarinBot cloud version due to {exp}. Currently using {str(version)}") intents = Intents().all() -client = discord.Bot(intents=intents) +client = Bot(intents=intents) @client.event async def on_ready(): @@ -39,7 +35,7 @@ async def on_ready(): config = loadJson("config.json") - await client.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=config["bot_activity"])) + await client.change_presence(activity=Activity(type=ActivityType.listening, name=config["bot_activity"])) await clearTrash(client) @@ -48,8 +44,8 @@ async def on_guild_join(guild): global path - os.mkdir(f"{path}/guilds/{str(guild.id)}") - os.mkdir(f"{path}/guilds/{str(guild.id)}/channels") + makedirs(f"{path}/guilds/{str(guild.id)}", exist_ok=True) + makedirs(f"{path}/guilds/{str(guild.id)}/channels", exist_ok=True) saveJson({}, f"{path}/guilds/{str(guild.id)}/config.json") appendLog(f"Joined guild '{guild}' with id {str(guild.id)}") @@ -60,7 +56,7 @@ async def on_guild_remove(guild): global path try: - shutil.rmtree(f"{path}/guilds/{str(guild.id)}") + rmtree(f"{path}/guilds/{str(guild.id)}") except: pass @@ -99,7 +95,7 @@ async def on_voice_state_update(member, before, after): await changeNomicPerms("allow", vc_to, member) if vc_to.id == guildConfGet(vc_to.guild, "channel"): if guildConfGet(vc_to.guild, "category") is not None: - voice_chan = await createUserVoice(vc_to, discord.utils.get(vc_to.guild.categories, id=guildConfGet(vc_to.guild, "category")), member) + voice_chan = await createUserVoice(vc_to, utils.get(vc_to.guild.categories, id=guildConfGet(vc_to.guild, "category")), member) try: await member.move_to(voice_chan) except: @@ -116,7 +112,7 @@ async def shutdown(ctx: ApplicationContext): config = loadJson("config.json") if ctx.author.id == config["owner"]: await ctx.respond(embed=makeEmbed(description=getMsg("shutdown", ctx.guild).format(ctx.author), color=strToColor(config["color_default"]))) - os.system(f"kill -9 {str(pid)}") + system(f"kill -9 {str(pid)}") else: await ctx.respond(embed=makeEmbed(title=getMsg("admin_title", ctx.guild), description=getMsg("admin_description", ctx.guild), color=strToColor(config["color_error"]))) #========================================================================================================================= @@ -131,7 +127,7 @@ async def help(ctx: ApplicationContext): locale = client.create_group("locale", "Commands related to bot's locale") valid_locales = [] -files_locales = os.listdir(f"{path}/locale/") +files_locales = listdir(f"{path}/locale/") for entry in files_locales: valid_locales.append(".".join(entry.split(".")[:-1])) @@ -139,13 +135,13 @@ for entry in files_locales: async def locale_set(ctx: ApplicationContext, language: Option(str, "One of the languages in list", choices=valid_locales)): # type: ignore config = loadJson("config.json") if ctx.guild is not None: - if language+".json" in os.listdir(f"{path}/locale/"): + if language+".json" in listdir(f"{path}/locale/"): guildConfSet(ctx.guild, "locale", language) appendLog(f"Server's locale is now set to {language}", ctx.guild) await ctx.respond(embed=makeEmbed(title=getMsg("set_locale_title", ctx.guild), description=getMsg("set_locale_description", ctx.guild).format(getMsg("locale_name", ctx.guild)), color=strToColor(config["color_ok"]))) else: valid_locales = [] - files_locales = os.listdir(f"{path}/locale/") + files_locales = listdir(f"{path}/locale/") for entry in files_locales: valid_locales.append(entry.split(".")[:-1]) await ctx.respond(embed=makeEmbed(title=getMsg("error_locale_title", ctx.guild), description=getMsg("error_locale_description", ctx.guild).format(", ".join(valid_locales)), color=strToColor(config["color_error"])))