Tasty refactor

This commit is contained in:
Profitroll 2023-01-17 15:05:39 +01:00
parent 78239c7e5d
commit a3a0aaa72b
2 changed files with 111 additions and 131 deletions

View File

@ -1,14 +1,12 @@
import gzip
import os
import sys
import json
import shutil
import discord # type: ignore
from json import loads, dumps
from gzip import open as gzipopen
from datetime import datetime
from pathlib import Path
from os import listdir, makedirs, remove, stat, path
from shutil import copyfileobj
from typing import Any, Literal, Union
from discord import utils, Embed, TextChannel, VoiceChannel, PermissionOverwrite, Guild, CategoryChannel, Member, Client, ApplicationContext, Message
path = Path(__file__).resolve().parent
# path = Path(__file__).resolve().parent
log_size = 512
@ -19,7 +17,7 @@ debug = False
try:
with open("config.json", 'r', encoding="utf-8") as json_file:
output = json.load(json_file)
output = loads(json_file.read())
json_file.close()
debug = output["debug"]
except:
@ -27,30 +25,28 @@ except:
# Check latest log size
def checkSize():
global path
path = str(path)
try:
if not os.path.isdir(f"{path}/logs"):
os.mkdir(f"{path}/logs")
if not path.isdir("logs"):
makedirs("logs", exist_ok=True)
log = os.stat(path + '/logs/latest.log')
log = stat("logs/latest.log")
global log_size
if (log.st_size / 1024) > log_size:
with open(path + '/logs/latest.log', 'rb') as f_in:
with gzip.open(f'{path}/logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
print(f'Copied {path}/logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz')
open(path + '/logs/latest.log', 'w').close()
with open("logs/latest.log", "rb") as f_in:
with gzipopen(f'logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz', 'wb') as f_out:
copyfileobj(f_in, f_out)
print(f'Copied logs/{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.log.gz')
open("logs/latest.log", "w").close()
except FileNotFoundError:
print('Not found')
pass
# Append string to log
def appendLog(message, guild=None, announce=True):
def appendLog(message: str, guild: Union[Guild, None] = None, announce=True) -> None:
global debug
global path
if guild == None:
message_formatted = f'[{datetime.now().strftime("%d.%m.%Y")}] [{datetime.now().strftime("%H:%M:%S")}] {message}'
@ -65,20 +61,20 @@ def appendLog(message, guild=None, announce=True):
checkSize()
log = open(path + '/logs/latest.log', 'a') # type: ignore
log = open('logs/latest.log', 'a') # type: ignore
log.write(f'{message_formatted}\n')
log.close()
def saveJson(value, filename):
def saveJson(value: Any, filename: str) -> None:
with open(filename, 'w', encoding="utf-8") as f:
json.dump(value, f, indent=4, ensure_ascii=False)
f.write(dumps(value, indent=4, ensure_ascii=False))
f.close()
def loadJson(filename):
def loadJson(filename: str) -> Any:
global debug
try:
with open(filename, 'r', encoding="utf-8") as json_file:
output = json.load(json_file)
output = loads(json_file.read())
json_file.close()
except Exception as exp:
if debug:
@ -89,57 +85,53 @@ def loadJson(filename):
def colorToStr():
pass
def strToColor(string):
def strToColor(string: str) -> int:
return int(hex(int(string.replace("#", ""), 16)), 0)
def gotCommand(message):
def gotCommand(message: Message) -> None:
global debug
if debug:
appendLog(f"Command '{message.content}' from {message.author} ({str(message.author.id)})", message.guild)
else:
appendLog(f"Command '{message.content}' from {message.author}", message.guild)
def guildConfGet(guild, variable):
global path
def guildConfGet(guild: Guild, variable: str) -> Any:
global debug
try:
config = loadJson(f"{path}/guilds/{str(guild.id)}/config.json")
config = loadJson(f"guilds/{str(guild.id)}/config.json")
return config[variable]
except Exception as exp:
if debug:
appendLog(f"Could not get guild config key '{variable}' due to {exp}", guild)
return None
def guildConfSet(guild, variable, value):
global path
config = loadJson(f"{path}/guilds/{str(guild.id)}/config.json")
def guildConfSet(guild: Guild, variable: str, value: Any) -> None:
config = loadJson(f"guilds/{str(guild.id)}/config.json")
config[variable] = value
try:
saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json")
saveJson(config, f"guilds/{str(guild.id)}/config.json")
except:
os.mkdir(f"{path}/guilds/{str(guild.id)}")
os.mkdir(f"{path}/guilds/{str(guild.id)}/channels")
saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json")
makedirs(f"guilds/{str(guild.id)}", exist_ok=True)
makedirs(f"guilds/{str(guild.id)}/channels", exist_ok=True)
saveJson(config, f"guilds/{str(guild.id)}/config.json")
appendLog(f"Guild config key '{variable}' is now set to '{value}'", guild)
def guildConfReset(guild, variable):
global path
def guildConfReset(guild: Guild, variable: str) -> None:
try:
config = loadJson(f"{path}/guilds/{str(guild.id)}/config.json")
config = loadJson(f"guilds/{str(guild.id)}/config.json")
del config[variable]
try:
saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json")
saveJson(config, f"guilds/{str(guild.id)}/config.json")
except:
os.mkdir(f"{path}/guilds/{str(guild.id)}")
os.mkdir(f"{path}/guilds/{str(guild.id)}/channels")
saveJson(config, f"{path}/guilds/{str(guild.id)}/config.json")
makedirs(f"guilds/{str(guild.id)}", exist_ok=True)
makedirs(f"guilds/{str(guild.id)}/channels", exist_ok=True)
saveJson(config, f"guilds/{str(guild.id)}/config.json")
appendLog(f"Guild config key '{variable}' has been reset", guild)
except Exception as exp:
appendLog(f"Could not reset guild config key '{variable}' due to {exp}", guild)
def guildLocaleGet(guild):
global path
config = loadJson(f"{path}/config.json")
def guildLocaleGet(guild: Guild) -> str:
config = loadJson(f"config.json")
try:
locale = guildConfGet(guild, "locale")
except:
@ -149,57 +141,54 @@ def guildLocaleGet(guild):
else:
return locale
def getMsg(string, guild=None):
global path
config = loadJson("config.json")
def getMsg(string: str, guild: Union[Guild, None] = None) -> str:
try:
locale = loadJson(f'{path}/locale/{guildLocaleGet(guild)}.json')
locale = loadJson(f'locale/{guildLocaleGet(guild)}.json')
return locale["messages"][string]
except Exception as exp:
appendLog(f"Could not get locale string named {string} due to exception {exp}", guild)
return string
def makeEmbed(title="", description="", footer="", color=0xffffff):
embed=discord.Embed(title=title, description=description, color=color)
def makeEmbed(title="", description="", footer="", color=0xffffff) -> Embed:
embed=Embed(title=title, description=description, color=color)
if footer is not None:
embed.set_footer(text=footer)
return embed
def channelExists(number, guild, type="Any"):
def channelExists(number: int, guild: Guild, type: Literal["Any", "Text", "Voice"] = "Any") -> bool:
global debug
if number == None:
return False
try:
if type == "Voice":
selected_channel = discord.utils.get(guild.channels, id=number)
if isinstance(selected_channel, discord.VoiceChannel):
selected_channel = utils.get(guild.channels, id=number)
if isinstance(selected_channel, VoiceChannel):
return True
elif type == "Text":
selected_channel = discord.utils.get(guild.channels, id=number)
if isinstance(selected_channel, discord.TextChannel):
selected_channel = utils.get(guild.channels, id=number)
if isinstance(selected_channel, TextChannel):
return True
elif type == "Any":
selected_channel = discord.utils.get(guild.channels, id=number)
selected_channel = utils.get(guild.channels, id=number)
return True
except Exception as exp:
if debug:
appendLog(f"Channel ID {str(number)} is not a channel due to {exp}")
return False
def channelGetName(number, guild):
def channelGetName(number: int, guild: Guild):
global debug
try:
selected_channel = discord.utils.get(guild.channels, id=number)
selected_channel = utils.get(guild.channels, id=number)
return selected_channel.name
except Exception as exp:
if debug:
appendLog(f"Channel ID {str(number)} is not a channel due to {exp}")
return "Channel doesn't exist"
def isUserVoice(vc):
global path
def isUserVoice(vc: VoiceChannel) -> bool:
try:
channels_list = os.listdir(f"{path}/guilds/{str(vc.guild.id)}/channels/")
channels_list = listdir(f"guilds/{str(vc.guild.id)}/channels/")
if f"{str(vc.id)}.json" in channels_list:
return True
else:
@ -207,19 +196,18 @@ def isUserVoice(vc):
except:
return False
async def removeUserVoice(vc):
global path
async def removeUserVoice(vc: VoiceChannel) -> None:
global debug
channels_list = os.listdir(f"{path}/guilds/{str(vc.guild.id)}/channels/")
channels_list = listdir(f"guilds/{str(vc.guild.id)}/channels/")
if f"{vc.id}.json" in channels_list:
vc_file = f"{path}/guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json"
vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json"
vc_conf = loadJson(vc_file)
needed_channel = discord.utils.get(vc.guild.channels, id=vc.id)
needed_channel = utils.get(vc.guild.channels, id=vc.id)
if loadJson("config.json")["enable_nomic"]:
nomic_channel = discord.utils.get(vc.guild.channels, id=vc_conf["nomic"])
nomic_channel = utils.get(vc.guild.channels, id=vc_conf["nomic"])
os.remove(vc_file)
remove(vc_file)
await needed_channel.delete()
if debug:
@ -236,28 +224,27 @@ async def removeUserVoice(vc):
else:
return
async def createUserVoice(vc, category, member):
global path
async def createUserVoice(vc: VoiceChannel, category: CategoryChannel, member: Member) -> VoiceChannel:
global debug
chan = {}
overwrites_channel = {
vc.guild.default_role: discord.PermissionOverwrite(view_channel=True),
vc.guild.me: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True),
member: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True)
vc.guild.default_role: PermissionOverwrite(view_channel=True),
vc.guild.me: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True),
member: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True)
}
overwrites_nomic = {
vc.guild.default_role: discord.PermissionOverwrite(view_channel=False, read_messages=False),
vc.guild.me: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True),
member: discord.PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True)
vc.guild.default_role: PermissionOverwrite(view_channel=False, read_messages=False),
vc.guild.me: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True),
member: PermissionOverwrite(read_messages=True, view_channel=True, manage_channels=True)
}
created_channel = await vc.guild.create_voice_channel(getMsg("name_voice", vc.guild).format(member.name), category=category, overwrites=overwrites_channel)
if debug:
appendLog(f"Created voice channel '{created_channel}' ({str(created_channel.id)}) for user {member} ({str(member.id)})", guild=vc.guild)
else:
appendLog(f"Created voice channel '{created_channel}' for user {member}", guild=vc.guild)
if not os.path.isdir(f"{path}/guilds/{str(created_channel.guild.id)}/channels"):
os.mkdir(f"{path}/guilds/{str(created_channel.guild.id)}/channels")
vc_file = f"{path}/guilds/{str(created_channel.guild.id)}/channels/{str(created_channel.id)}.json"
if not path.isdir(f"guilds/{str(created_channel.guild.id)}/channels"):
makedirs(f"guilds/{str(created_channel.guild.id)}/channels", exist_ok=True)
vc_file = f"guilds/{str(created_channel.guild.id)}/channels/{str(created_channel.id)}.json"
chan["ownerid"] = member.id
saveJson(chan, vc_file)
if loadJson("config.json")["enable_nomic"]:
@ -270,40 +257,37 @@ async def createUserVoice(vc, category, member):
saveJson(chan, vc_file)
return created_channel
def isVoiceOfUser(vc, member):
global path
vc_file = f"{path}/guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json"
def isVoiceOfUser(vc: VoiceChannel, member: Member) -> bool:
vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json"
vc_conf = loadJson(vc_file)
if vc_conf["ownerid"] == member.id:
return True
else:
return False
async def changeNomicPerms(mode, vc, member):
global path
vc_file = f"{path}/guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json"
async def changeNomicPerms(mode: Literal["deny", "allow"], vc: VoiceChannel, member: Member) -> None:
vc_file = f"guilds/{str(vc.guild.id)}/channels/{str(vc.id)}.json"
vc_conf = loadJson(vc_file)
if loadJson("config.json")["enable_nomic"]:
nomic_channel = discord.utils.get(vc.guild.channels, id=vc_conf["nomic"])
nomic_channel = utils.get(vc.guild.channels, id=vc_conf["nomic"])
if mode == "deny":
await nomic_channel.set_permissions(member, view_channel=False)
else:
await nomic_channel.set_permissions(member, view_channel=True)
async def clearTrash(client):
global path
if not os.path.isdir(f"{path}/guilds/"):
os.mkdir(f"{path}/guilds")
guilds_list = os.listdir(f"{path}/guilds/")
async def clearTrash(client: Client):
if not path.isdir(f"guilds/"):
makedirs(f"guilds", exist_ok=True)
guilds_list = listdir(f"guilds/")
for guild in guilds_list:
guild_object = client.get_guild(int(guild))
if os.path.isdir(f"{path}/guilds/{guild}/channels"):
channels_list = os.listdir(f"{path}/guilds/{guild}/channels/")
if path.isdir(f"guilds/{guild}/channels"):
channels_list = listdir(f"guilds/{guild}/channels/")
for channel in channels_list:
channel_id = channel[:-5]
try:
selected_channel = discord.utils.get(guild_object.voice_channels, id=int(channel_id))
channel_owner = loadJson(f"{path}/guilds/{guild}/channels/{channel}")["ownerid"]
selected_channel = utils.get(guild_object.voice_channels, id=int(channel_id))
channel_owner = loadJson(f"guilds/{guild}/channels/{channel}")["ownerid"]
remove_channel = True
for member in selected_channel.members:
if member.id == channel_owner:
@ -311,12 +295,12 @@ async def clearTrash(client):
if remove_channel:
await removeUserVoice(selected_channel)
except:
os.remove(f"{path}/guilds/{guild}/channels/{channel_id}.json")
remove(f"guilds/{guild}/channels/{channel_id}.json")
#async def autoClearTrash(client):
# execute clearTrash every 120 seconds
def getHelpMessage(ctx, version):
def getHelpMessage(ctx: ApplicationContext, version: float) -> Embed:
#channelExists(number, guild, type="Voice")
@ -337,9 +321,9 @@ def getHelpMessage(ctx, version):
description = "\n".join([desc_locale, desc_channel, desc_category])
embed=discord.Embed(title=getMsg("help_title", ctx.guild), description=description, color=strToColor(config["color_default"]))
embed=Embed(title=getMsg("help_title", ctx.guild), description=description, color=strToColor(config["color_default"]))
else:
embed=discord.Embed(title=getMsg("help_title_dm", ctx.guild), color=strToColor(config["color_default"]))
embed=Embed(title=getMsg("help_title_dm", ctx.guild), color=strToColor(config["color_default"]))
embed.set_author(name=f'{config["bot_name"]} v{str(version)}', url=config["bot_site"], icon_url=config["bot_icon"])
@ -357,7 +341,7 @@ def getHelpMessage(ctx, version):
return embed
async def guildConfigured(guild):
async def guildConfigured(guild: Guild) -> str:
output = {}
config = loadJson("config.json")
@ -365,7 +349,7 @@ async def guildConfigured(guild):
for kind in ["channel", "category"]:
if guildConfGet(guild, kind) is not None:
try:
guild_object = discord.utils.get(guild.categories, id=guildConfGet(guild, kind))
guild_object = utils.get(guild.categories, id=guildConfGet(guild, kind))
output[kind] = getMsg("configured_"+kind, guild).format(guild_object.name)
except Exception as exp:
output[kind] = getMsg("unconfigured_"+kind, guild)

View File

@ -1,28 +1,24 @@
import os
import sys
import json
import shutil
import requests # type: ignore
import threading
try:
import discord # type: ignore
from discord import ApplicationContext, Option, Intents # type: ignore
from requests import get
from discord import ApplicationContext, Option, Intents, Bot, ActivityType, Activity
except Exception as exp:
print(f"Module py-cord is not installed. Make sure to run 'pip install -r requirements.txt' before first start")
sys.exit()
print(f"Dependencies not installed. Make sure to run 'pip install -r requirements.txt' before first start")
exit()
from os import getpid, system
from shutil import rmtree
from functions import *
pid = os.getpid()
version = 1.7
pid = getpid()
version = 1.8
if loadJson("config.json")["owner"] == "SET-OWNER-ID" or loadJson("config.json")["bot_token"] == "SET-BOT-TOKEN":
print(f"Bot is not correctly configured.\nMake sure you've set up owner id and bot token in {path}/config.json\nLearn more here: https://git.end-play.xyz/profitroll/YusarinBot")
sys.exit()
exit()
if loadJson("config.json")["check_for_updates"]:
try:
serv_ver = json.loads(requests.get("https://api.end-play.xyz/version?app=yusarinbot&apikey=publickey").text)["version"]
serv_ver = loads(get("https://api.end-play.xyz/version?app=yusarinbot&apikey=publickey").text)["version"]
if float(serv_ver) > version:
appendLog(f"YusarinBot version {serv_ver} is available. Download new version here: https://git.end-play.xyz/profitroll/YusarinBot/releases/latest")
appendLog(f"Currently using YusarinBot v{str(version)}")
@ -30,7 +26,7 @@ if loadJson("config.json")["check_for_updates"]:
appendLog(f"Could not get YusarinBot cloud version due to {exp}. Currently using {str(version)}")
intents = Intents().all()
client = discord.Bot(intents=intents)
client = Bot(intents=intents)
@client.event
async def on_ready():
@ -39,7 +35,7 @@ async def on_ready():
config = loadJson("config.json")
await client.change_presence(activity=discord.Activity(type=discord.ActivityType.listening, name=config["bot_activity"]))
await client.change_presence(activity=Activity(type=ActivityType.listening, name=config["bot_activity"]))
await clearTrash(client)
@ -48,8 +44,8 @@ async def on_guild_join(guild):
global path
os.mkdir(f"{path}/guilds/{str(guild.id)}")
os.mkdir(f"{path}/guilds/{str(guild.id)}/channels")
makedirs(f"{path}/guilds/{str(guild.id)}", exist_ok=True)
makedirs(f"{path}/guilds/{str(guild.id)}/channels", exist_ok=True)
saveJson({}, f"{path}/guilds/{str(guild.id)}/config.json")
appendLog(f"Joined guild '{guild}' with id {str(guild.id)}")
@ -60,7 +56,7 @@ async def on_guild_remove(guild):
global path
try:
shutil.rmtree(f"{path}/guilds/{str(guild.id)}")
rmtree(f"{path}/guilds/{str(guild.id)}")
except:
pass
@ -99,7 +95,7 @@ async def on_voice_state_update(member, before, after):
await changeNomicPerms("allow", vc_to, member)
if vc_to.id == guildConfGet(vc_to.guild, "channel"):
if guildConfGet(vc_to.guild, "category") is not None:
voice_chan = await createUserVoice(vc_to, discord.utils.get(vc_to.guild.categories, id=guildConfGet(vc_to.guild, "category")), member)
voice_chan = await createUserVoice(vc_to, utils.get(vc_to.guild.categories, id=guildConfGet(vc_to.guild, "category")), member)
try:
await member.move_to(voice_chan)
except:
@ -116,7 +112,7 @@ async def shutdown(ctx: ApplicationContext):
config = loadJson("config.json")
if ctx.author.id == config["owner"]:
await ctx.respond(embed=makeEmbed(description=getMsg("shutdown", ctx.guild).format(ctx.author), color=strToColor(config["color_default"])))
os.system(f"kill -9 {str(pid)}")
system(f"kill -9 {str(pid)}")
else:
await ctx.respond(embed=makeEmbed(title=getMsg("admin_title", ctx.guild), description=getMsg("admin_description", ctx.guild), color=strToColor(config["color_error"])))
#=========================================================================================================================
@ -131,7 +127,7 @@ async def help(ctx: ApplicationContext):
locale = client.create_group("locale", "Commands related to bot's locale")
valid_locales = []
files_locales = os.listdir(f"{path}/locale/")
files_locales = listdir(f"{path}/locale/")
for entry in files_locales:
valid_locales.append(".".join(entry.split(".")[:-1]))
@ -139,13 +135,13 @@ for entry in files_locales:
async def locale_set(ctx: ApplicationContext, language: Option(str, "One of the languages in list", choices=valid_locales)): # type: ignore
config = loadJson("config.json")
if ctx.guild is not None:
if language+".json" in os.listdir(f"{path}/locale/"):
if language+".json" in listdir(f"{path}/locale/"):
guildConfSet(ctx.guild, "locale", language)
appendLog(f"Server's locale is now set to {language}", ctx.guild)
await ctx.respond(embed=makeEmbed(title=getMsg("set_locale_title", ctx.guild), description=getMsg("set_locale_description", ctx.guild).format(getMsg("locale_name", ctx.guild)), color=strToColor(config["color_ok"])))
else:
valid_locales = []
files_locales = os.listdir(f"{path}/locale/")
files_locales = listdir(f"{path}/locale/")
for entry in files_locales:
valid_locales.append(entry.split(".")[:-1])
await ctx.respond(embed=makeEmbed(title=getMsg("error_locale_title", ctx.guild), description=getMsg("error_locale_description", ctx.guild).format(", ".join(valid_locales)), color=strToColor(config["color_error"])))