This repository has been archived on 2024-10-14. You can view files and clone it, but cannot push or open issues or pull requests.
AutoZoom/modules/functions.py
2022-04-28 01:18:08 +02:00

604 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
from socket import send_fds
import time
import json
import os
import shutil
import gzip
import getpass
import keyboard
from modules.telegram import telegramSendText
from datetime import datetime
from pathlib import Path
import asyncio, threading
from subprocess import check_output
path = Path(__file__).resolve().parent
sounds_folder = str(Path(str(path)+"/sounds/")) + os.sep
files_folder = str(Path(str(path)+"/files/")) + os.sep
logs_folder = str(Path(str(path)+"/logs/")) + os.sep
yes_list = ['y', 'yes', 'т', 'так', 'j', 'ja']
no_list = ['n', 'no', 'н', 'ні', 'nein']
default_config = {
"firstrun": True,
"debug": False,
"update_check": True,
"logging": {
"enabled": True,
"rotate_size": 512
},
"meetings": {
"remove_old": True
},
"meeting_end": {
"mode": "shutdown",
"shutdown": {
"timeout": 30
}
},
"obs": {
"enabled": False,
"path_bin": None,
"path_core": None,
"delay": 10,
"video": {
"send": False,
"path": None,
"filename": None
},
"keybinds": {
"record_start": "shift+f7",
"record_stop": "shift+f8"
}
},
"telegram": {
"enabled": False,
"token": None,
"user_id": None
},
"appearance": {
"theme": "dark",
"colors": True,
"fullscreen": False
},
"rpc": {
"enabled": True,
"app_id": "800049969960058882"
},
"sounds": {
"enabled": True,
"sounds": {
"meeting_ended": "ended",
"record_start": "recordstart",
"record_stop": "recordstop",
"shutdown": "shutdown",
"meeting_started": "started",
"meeting_warning": "warning"
}
},
"binds": {
"app_start": {
"commands": [],
"keymaps": [],
"messages": []
},
"app_end": {
"commands": [],
"keymaps": [],
"messages": []
},
"queue_start": {
"commands": [],
"keymaps": [],
"messages": []
},
"queue_end": {
"commands": [],
"keymaps": [],
"messages": []
},
"meeting_start": {
"commands": [],
"keymaps": [],
"messages": []
},
"meeting_end": {
"commands": [],
"keymaps": [],
"messages": []
}
}
# "start": "shift+f7",
# "stop": "shift+f8",
# "telegram_enabled": False,
# "use_colors": True,
# "run_fullscreen": False,
# "rpc_use": True,
# "rpc_id": "800049969960058882",
# "sounds": True,
# "end_mode": "shutdown",
# "obs_exe": None,
# "obs_core": None,
# "obs_delay": 10,
# "write_logs": True,
# "log_size": 512,
# "sound_ended": "ended",
# "sound_recordstart": "recordstart",
# "sound_recordstop": "recordstop",
# "sound_shutdown": "shutdown",
# "sound_started": "started",
# "sound_warning": "warning"
# "shutdown_timeout": 30,
# "shutdown_enabled": False,
}
# Функция возвращающая надпись Windows Only
def winOnly(color, reset, system, start='', end=''):
if system != 'windows':
return f"{start}{color}Только для Windows!{reset}{end}"
else:
return ""
# Функция возвращающая тип ОС
def getOS():
if os.name == 'nt':
return "windows"
elif 'android' in str(check_output('uname -a', shell=True).lower()):
return "android"
else:
return "unix"
# Функция отвечает за очищение командной строки
if getOS() == "windows":
clear = lambda: os.system('cls')
else:
clear = lambda: os.system('clear')
# Импортирование игралки звуков
try:
if getOS() == "windows":
import winsound
from playsound import playsound
elif getOS() == "unix":
from playsound import playsound
except:
pass
# Установка заголовка окна cmd.exe
def setTitle(title, system):
if system == "windows":
try:
os.system(f"title {title}")
except:
pass
# Получить номер дня недели
def getDayNum(day):
output = datetime.strptime(day, "%d.%m.%Y").isoweekday()
return output
# Функция проверки размера файла
def checkSize():
global logs_folder
i = 0
while i < 2:
try:
log = os.stat(logs_folder + 'latest.log')
if (log.st_size / 1024) > getConfig("log_size"):
with open(logs_folder + 'latest.log', 'rb', encoding='utf-8') as f_in:
with gzip.open(f'{logs_folder}{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.zip', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
if getConfig("debug"):
print(f'Copied {logs_folder}{datetime.now().strftime("%d.%m.%Y_%H:%M:%S")}.zip')
open(logs_folder + 'latest.log', 'w', encoding='utf-8').close()
i = 2
except FileNotFoundError:
if getConfig("debug"):
print('Log file not found')
time.sleep(2)
try:
log = open(logs_folder + 'latest.log', 'a', encoding='utf-8')
open(logs_folder + 'latest.log', 'a', encoding='utf-8').close()
except:
try:
os.mkdir(logs_folder)
log = open(logs_folder + 'latest.log', 'a', encoding='utf-8')
open(logs_folder + 'latest.log', 'a', encoding='utf-8').close()
except:
if getConfig("debug"):
time.sleep(2)
print('Log file could not be created')
i += 1
# Функция добавления в лог
def appendLog(message, startup=False, shutdown=False):
if getConfig("write_logs"):
global logs_folder
checkSize()
try:
log = open(logs_folder + 'latest.log', 'a', encoding='utf-8')
open(logs_folder + 'latest.log', 'a', encoding='utf-8').close()
except:
try:
os.mkdir(logs_folder)
log = open(logs_folder + 'latest.log', 'a', encoding='utf-8')
open(logs_folder + 'latest.log', 'a', encoding='utf-8').close()
except:
time.sleep(2)
print('Log file could not be created')
if startup:
log.write(f'[{datetime.now().strftime("%H:%M:%S | %d.%m.%Y")}] [STARTUP] {message}\n')
elif shutdown:
log.write(f'[{datetime.now().strftime("%H:%M:%S | %d.%m.%Y")}] [SHUTDOWN] {message}\n')
else:
log.write(f'[{datetime.now().strftime("%H:%M:%S | %d.%m.%Y")}] {message}\n')
log.close()
# Функция проигрывания звука
def playSound(soundname, timing=''):
global sysname
if getConfig("sounds"):
if getOS() == "windows":
try:
winsound.PlaySound(sounds_folder+soundname+".wav", winsound.SND_FILENAME)
except Exception as exp:
appendLog(f'Could not play winsound: {exp}')
if getConfig("debug"):
print(f'{timing} Не удалось проиграть winsound звук "{soundname}" (Ошибка: {exp})')
try:
playsound(sounds_folder+soundname+".wav")
except Exception as exp:
appendLog(f'Could not play playsound: {exp}')
if getConfig("debug"):
print(f'{timing} Не удалось проиграть playsound звук "{soundname}" (Ошибка: {exp})')
elif getOS() == "android":
try:
os.system(f'play-audio {sounds_folder}{soundname}.wav')
except Exception as exp:
appendLog(f'Could not play play-audio: {exp}')
else:
try:
playsound(sounds_folder+soundname+".wav")
except Exception as exp:
appendLog(f'Could not play playsound: {exp}')
if getConfig("debug"):
print(f'{timing} Не удалось проиграть playsound звук "{soundname}" (Ошибка: {exp})')
# Функция удаления ненужного мусора из строки
def strCleaner(string):
output = string.replace('"', '\"').replace('\n', '')
appendLog(f"String cleaned: {output}")
return output
# Load json to dict
def jsonLoad(filename):
"""Loads arg1 as json and returns its contents"""
with open(filename, "r", encoding='utf8') as file:
output = json.load(file)
file.close()
return output
# Save json dict to filename
def jsonSave(contents, filename):
"""Dumps dict/list arg1 to file arg2"""
with open(filename, "w", encoding='utf8') as file:
json.dump(contents, file, ensure_ascii=False, indent=4)
file.close()
return
# Функция добавления переменных, если их нет
def repairConfig(some_dic):
global files_folder
global default_config
for key in default_config:
try:
some_dic[key]
except KeyError:
some_dic[key] = default_config[key]
saveJson(files_folder+'config.json', some_dic)
# Функция изменения переменной конфигурации
def setConfig(some_var, some_val):
global files_folder
global default_config
if os.path.exists(files_folder):
if not os.path.exists(files_folder+'config.json'):
temp_config_list = default_config
temp_config_list[some_var] = some_val
saveJson(files_folder+'config.json', temp_config_list)
else:
try:
with open(f"{files_folder}config.json", encoding="utf-8") as json_file:
config_list = json.load(json_file)
json_file.close()
try:
config_list[some_var] = some_val
saveJson(files_folder+'config.json', config_list)
appendLog(f'Changed variable "{some_var}" to {some_val}')
except:
try:
repairConfig(config_list)
config_list = json.load(json_file)
json_file.close()
config_list[some_var] = some_val
saveJson(files_folder+'config.json', config_list)
appendLog(f'Changed variable "{some_var}" to {some_val}')
except:
pass
except:
return "Error"
else:
os.mkdir(files_folder)
if not os.path.exists(files_folder+'config.json'):
temp_config_list = default_config
temp_config_list[some_var] = some_val
saveJson(files_folder+'config.json', temp_config_list)
else:
try:
with open(f"{files_folder}config.json", encoding="utf-8") as json_file:
config_list = json.load(json_file)
json_file.close()
try:
config_list[some_var] = some_val
saveJson(files_folder+'config.json', config_list)
except:
try:
repairConfig(config_list)
config_list = json.load(json_file)
json_file.close()
config_list[some_var] = some_val
saveJson(files_folder+'config.json', config_list)
appendLog(f'Changed variable "{some_var}" to {some_val}')
except:
config_list[some_var] = some_val
saveJson(files_folder+'config.json', config_list)
appendLog(f'Changed variable "{some_var}" to {some_val}')
except:
return "Error"
# Функция получения переменной конфигурации
def getConfig(some_var):
global files_folder
global default_config
if os.path.exists(files_folder):
if not os.path.exists(files_folder+'config.json'):
temp_config_list = default_config
saveJson(files_folder+'config.json', temp_config_list)
return temp_config_list[some_var]
else:
try:
with open(f"{files_folder}config.json", encoding="utf-8") as json_file:
config_list = json.load(json_file)
json_file.close()
try:
return config_list[some_var]
except:
try:
try:
setConfig(some_var, default_config[some_var])
return default_config[some_var]
except:
repairConfig(config_list)
config_list = json.load(json_file)
json_file.close()
return config_list[some_var]
except:
return default_config[some_var]
except:
return "Error"
else:
os.mkdir(files_folder)
if not os.path.exists(files_folder+'config.json'):
temp_config_list = default_config
saveJson(files_folder+'config.json', temp_config_list)
return temp_config_list[some_var]
else:
try:
with open(f"{files_folder}config.json", encoding="utf-8") as json_file:
config_list = json.load(json_file)
json_file.close()
try:
return config_list[some_var]
except:
try:
repairConfig(config_list)
config_list = json.load(json_file)
json_file.close()
return config_list[some_var]
except:
return default_config[some_var]
except:
return "Error"
# Получить статус процесса
def getState(process="CptHost.exe"):
if getOS() == 'windows':
try:
output = os.popen(f'tasklist /fi "IMAGENAME eq {process}" /fi "USERNAME ne NT AUTHORITY\{getpass.getuser()}"').read()
if process in output:
return True
else:
return False
except Exception as exp:
appendLog(f'Failed to get state using tasklist: {exp}')
output = os.popen('wmic process get description, processid').read()
if "CptHost.exe" in output:
return True
else:
return False
# Функция сохранения информации в json файл
def saveJson(filename, value):
with open(filename, 'w', encoding="utf-8") as f:
json.dump(value, f, indent=4, ensure_ascii=False)
f.close()
# Fire some function
_loop = None
def fire_and_forget(coro):
global _loop
if _loop is None:
_loop = asyncio.new_event_loop()
threading.Thread(target=_loop.run_forever, daemon=True).start()
_loop.call_soon_threadsafe(asyncio.create_task, coro)
def configSet(key: str, value, *args: str):
"""Set key to a value
Args:
* key (str): The last key of the keys path.
* value (str/int/float/list/dict/None): Some needed value.
* *args (str): Path to key like: dict[args][key].
"""
this_dict = jsonLoad(f"{files_folder}config.json")
string = "this_dict"
for arg in args:
string += f'["{arg}"]'
if type(value) in [str]:
string += f'["{key}"] = "{value}"'
else:
string += f'["{key}"] = {value}'
exec(string)
jsonSave(this_dict, f"{files_folder}config.json")
return
def configGet(key: str, *args: str):
"""Get value of the config key
Args:
* key (str): The last key of the keys path.
* *args (str): Path to key like: dict[args][key].
Returns:
* any: Value of provided key
"""
this_dict = jsonLoad(f"{files_folder}config.json")
this_key = this_dict
for dict_key in args:
this_key = this_key[dict_key]
return this_key[key]
async def execBind(action: str, kind="command"):
"""Execute binded action
Args:
* action (str): Bind, command or message.
* kind (str, optional): "keybind", "command" or "message". Defaults to "command".
"""
if kind == "message":
telegramSendText(message=action)
elif kind == "keybind":
keyboard.press_and_release(action)
else:
os.system(action)
return