TelegramPoster/main.py

340 lines
16 KiB
Python

import os
import random
import shutil
import subprocess
import sys
from threading import Thread
import time
import traceback
from modules.logging import logWrite
from modules.utils import configGet, jsonLoad, jsonSave, killProc, locale
# Args =====================================================================================================================================
if "--move-sent" in sys.argv:
for entry in jsonLoad(configGet("index", "locations"))["sent"]:
try:
shutil.move(configGet("queue", "locations")+os.sep+entry, configGet("sent", "locations")+os.sep+entry)
except FileNotFoundError:
logWrite(locale("move_sent_doesnt_exist", "console", locale=configGet("locale")).format(entry))
except Exception as exp:
logWrite(locale("move_sent_doesnt_exception", "console", locale=configGet("locale")).format(entry, exp))
logWrite(locale("move_sent_completed", "console", locale=configGet("locale")))
if "--cleanup" in sys.argv:
if "--confirm" in sys.argv:
index = jsonLoad(configGet("index", "locations"))
for entry in index["sent"]:
try:
try:
os.remove(configGet("queue", "locations")+os.sep+entry)
except FileNotFoundError:
pass
try:
os.remove(configGet("sent", "locations")+os.sep+entry)
except FileNotFoundError:
pass
except Exception as exp:
logWrite(locale("cleanup_exception", "console", locale=configGet("locale")).format(entry, exp))
jsonSave(index, jsonLoad(configGet("index", "locations")))
logWrite(locale("cleanup_completed", "console", locale=configGet("locale")))
else:
logWrite(locale("cleanup_unathorized", "console", locale=configGet("locale")))
if "--cleanup-index" in sys.argv:
if "--confirm" in sys.argv:
index = jsonLoad(configGet("index", "locations"))
index["sent"] = []
jsonSave(index, jsonLoad(configGet("index", "locations")))
logWrite(locale("cleanup_index_completed", "console", locale=configGet("locale")))
else:
logWrite(locale("cleanup_index_unathorized", "console", locale=configGet("locale")))
if "--norun" in sys.argv:
logWrite(locale("passed_norun", "console", locale=configGet("locale")))
sys.exit()
#===========================================================================================================================================
# Import ===================================================================================================================================
try:
import schedule # type: ignore
from pyrogram import Client, filters, idle # type: ignore
from pyrogram.types import ChatPermissions, ReplyKeyboardMarkup, InlineKeyboardMarkup, InlineKeyboardButton, BotCommand, BotCommandScopeChat # type: ignore
except ModuleNotFoundError:
print(locale("deps_missing", "console", locale=configGet("locale")), flush=True)
sys.exit()
#===========================================================================================================================================
pid = os.getpid()
app = Client("duptsiaposter", bot_token=configGet("bot_token", "bot"), api_id=configGet("api_id", "bot"), api_hash=configGet("api_hash", "bot"))
def send_content():
try:
list_sent = jsonLoad(configGet("index", "locations"))
list_queue = os.listdir(configGet("queue", "locations"))
for file in list_queue:
if not file in list_sent["sent"]:
ext_match = False
for ext in configGet("photo", "posting", "extensions"):
if file.endswith(ext):
ext_match = True
ext_type = "photo"
break
for ext in configGet("video", "posting", "extensions"):
if file.endswith(ext):
ext_match = True
ext_type = "video"
break
if not ext_match:
list_queue.remove(file)
else:
list_queue.remove(file)
if len(list_queue) > 0:
candidate_file = random.choice(list_queue)
candidate = configGet("queue", "locations")+os.sep+candidate_file
else:
logWrite(locale("post_empty", "console", locale=configGet("locale")))
if configGet("error", "reports"):
app.send_message(configGet("admin", "reports"), locale("post_empty", "message", locale=configGet("locale"))) # type: ignore
return
if ext_type == "photo": # type: ignore
if configGet("enabled", "caption"):
if configGet("link", "caption") != None:
sent = app.send_photo(configGet("channel", "posting"), candidate, caption=f"[{configGet('text', 'caption')}]({configGet('link', 'caption')})", disable_notification=configGet("silent", "posting")) # type: ignore
else:
sent = app.send_photo(configGet("channel", "posting"), candidate, caption=configGet('text', 'caption'), disable_notification=configGet("silent", "posting")) # type: ignore
else:
sent = app.send_photo(configGet("channel", "posting"), candidate, disable_notification=configGet("silent", "posting")) # type: ignore
elif ext_type == "video": # type: ignore
if configGet("enabled", "caption"):
if configGet("link", "caption") != None:
sent = app.send_video(configGet("channel", "posting"), candidate, caption=f"[{configGet('text', 'caption')}]({configGet('link', 'caption')})", disable_notification=configGet("silent", "posting")) # type: ignore
else:
sent = app.send_video(configGet("channel", "posting"), candidate, caption=configGet('text', 'caption'), disable_notification=configGet("silent", "posting")) # type: ignore
else:
sent = app.send_video(configGet("channel", "posting"), candidate, disable_notification=configGet("silent", "posting")) # type: ignore
list_sent["sent"].append(candidate_file)
jsonSave(list_sent, configGet("index", "locations"))
if configGet("move_sent", "posting"):
shutil.move(candidate, configGet("sent", "locations")+os.sep+candidate_file)
if configGet("sent", "reports"):
app.send_message(configGet("admin", "reports"), f"Posted `{candidate_file}`", disable_web_page_preview=True, reply_markup=InlineKeyboardMarkup([
[InlineKeyboardButton("View in channel", url=sent.link)] # type: ignore
])) # type: ignore
except Exception as exp:
logWrite(f"Could not send content due to {exp}\nTraceback: {traceback.format_exc()}")
if configGet("error", "reports"):
app.send_message(configGet("admin", "reports"), locale("post_exception", "message", locale=configGet("locale")).format(exp, traceback.format_exc())) # type: ignore
@app.on_message(~ filters.scheduled & filters.command(["start"], prefixes="/"))
def start(app, msg):
if msg.from_user.id not in jsonLoad(configGet("blocked", "locations")):
msg.reply_text(locale("start", "message", locale=msg.from_user.language_code))
@app.on_message(~ filters.scheduled & filters.command(["rules", "help"], prefixes="/"))
def start(app, msg):
if msg.from_user.id not in jsonLoad(configGet("blocked", "locations")):
msg.reply_text(locale("rules", "message", locale=msg.from_user.language_code))
@app.on_message(~ filters.scheduled & filters.command(["kill", "die", "reboot"], prefixes=["", "/"]))
def kill(app, msg):
if msg.from_user.id == configGet("admin", "reports"):
msg.reply_text(locale("shutdown", "message", locale=configGet("locale")).format(str(pid)))
killProc(pid)
# Submission =====================================================================================================================================
def subLimit(user):
submit = jsonLoad(configGet("submit", "locations"))
submit[str(user.id)] = time.time()
jsonSave(submit, configGet("submit", "locations"))
def subLimited(user):
if user.id == configGet("admin", "reports"):
return False
else:
submit = jsonLoad(configGet("submit", "locations"))
if str(user.id) in submit:
if (time.time() - submit[str(user.id)]) < configGet("timeout", "submission"):
return True
else:
return False
else:
return False
def subBlock(user):
blocked = jsonLoad(configGet("blocked", "locations"))
if user not in blocked:
blocked.append(user)
jsonSave(blocked, configGet("blocked", "locations"))
def subUnblock(user):
blocked = jsonLoad(configGet("blocked", "locations"))
if user in blocked:
blocked.remove(user)
jsonSave(blocked, configGet("blocked", "locations"))
@app.on_message(~ filters.scheduled & filters.photo | filters.video | filters.animation)
def get_submission(_, msg):
if msg.from_user.id not in jsonLoad(configGet("blocked", "locations")):
user_locale = msg.from_user.language_code
if not subLimited(msg.from_user):
if msg.caption != None:
caption = str(msg.caption)
else:
caption = ""
caption += locale("sub_by", "message", locale=locale(configGet("locale")))
if msg.from_user.first_name != None:
caption += f" {msg.from_user.first_name}"
if msg.from_user.last_name != None:
caption += f" {msg.from_user.last_name}"
if msg.from_user.username != None:
caption += f" (@{msg.from_user.username})"
if msg.from_user.phone_number != None:
caption += f" ({msg.from_user.phone_number})"
msg.copy(configGet("admin", "reports"), caption=caption, reply_markup=InlineKeyboardMarkup([
[
InlineKeyboardButton(text=locale("sub_yes", "button", locale=configGet("locale")), callback_data=f"sub_yes_{msg.from_user.id}_{msg.id}"),
InlineKeyboardButton(text=locale("sub_no", "button", locale=configGet("locale")), callback_data=f"sub_no_{msg.from_user.id}_{msg.id}")
],
[
InlineKeyboardButton(text=locale("sub_block", "button", locale=configGet("locale")), callback_data=f"sub_block_{msg.from_user.id}")
],
[
InlineKeyboardButton(text=locale("sub_unblock", "button", locale=configGet("locale")), callback_data=f"sub_unblock_{msg.from_user.id}")
]
]
))
msg.reply_text(locale("sub_sent", "message", locale=user_locale), quote=True)
subLimit(msg.from_user)
else:
msg.reply_text(locale("sub_cooldown", "message", locale=user_locale).format(str(configGet("timeout", "submission"))))
@app.on_callback_query(filters.regex("sub_yes_[\s\S]*_[\s\S]*")) # type: ignore
def callback_query_yes(app, clb): # type: ignore
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
try:
submission = app.get_messages(int(fullclb[2]), int(fullclb[3]))
except:
clb.answer(text=locale("sub_msg_unavail", "message", locale=user_locale), show_alert=True)
return
try:
app.download_media(submission, file_name=configGet("queue", "locations")+os.sep)
except:
clb.answer(text=locale("sub_media_unavail", "message", locale=user_locale), show_alert=True)
return
submission.reply_text(locale("sub_yes", "message", locale=submission.from_user.language_code), quote=True)
clb.answer(text=locale("sub_yes", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
@app.on_callback_query(filters.regex("sub_no_[\s\S]*_[\s\S]*")) # type: ignore
def callback_query_no(app, clb): # type: ignore
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
try:
submission = app.get_messages(int(fullclb[2]), int(fullclb[3]))
except:
clb.answer(text=locale("sub_msg_unavail", "message", locale=user_locale), show_alert=True)
return
submission.reply_text(locale("sub_no", "message", locale=submission.from_user.language_code), quote=True)
clb.answer(text=locale("sub_no", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
@app.on_callback_query(filters.regex("sub_block_[\s\S]*")) # type: ignore
def callback_query_block(app, clb): # type: ignore
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
app.send_message(int(fullclb[2]), locale("sub_msg_unavail", "message", locale=configGet("locale")))
subBlock(int(fullclb[2]))
clb.answer(text=locale("sub_block", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
@app.on_callback_query(filters.regex("sub_unblock_[\s\S]*")) # type: ignore
def callback_query_unblock(app, clb): # type: ignore
fullclb = clb.data.split("_")
user_locale = clb.from_user.language_code
app.send_message(int(fullclb[2]), locale("sub_msg_unavail", "message", locale=configGet("locale")))
subUnblock(int(fullclb[2]))
clb.answer(text=locale("sub_unblock", "callback", locale=user_locale).format(fullclb[2]), show_alert=True)
#===========================================================================================================================================
for entry in configGet("time", "posting"):
schedule.every().day.at(entry).do(send_content)
def background_task():
try:
while True:
try:
schedule.run_pending()
time.sleep(1)
except:
pass
except Exception as exp:
logWrite(locale("exception_occured", "console", locale=configGet("locale")).format(exp))
except KeyboardInterrupt:
logWrite(locale("keyboard_interrupt", "console", locale=configGet("locale")))
if configGet("shutdown", "reports"):
app.send_message(configGet("admin", "reports"), locale("shutdown", "message", locale=configGet("locale")).format(str(pid))) # type: ignore
killProc(pid)
if __name__ == "__main__":
logWrite(locale("startup", "console", locale=configGet("locale")).format(str(pid)))
app.start() # type: ignore
if configGet("startup", "reports"):
app.send_message(configGet("admin", "reports"), locale("startup", "message", locale=configGet("locale")).format(str(pid))) # type: ignore
t = Thread(target=background_task)
t.start()
for entry in os.listdir(configGet("locale", "locations")):
if entry.endswith(".json"):
commands_list = []
for command in configGet("commands"):
commands_list.append(BotCommand(command, locale(command, "commands", locale=entry.replace(".json", ""))))
app.set_bot_commands(commands_list, language_code=entry.replace(".json", ""))
commands_list = []
for command in configGet("commands"):
commands_list.append(BotCommand(command, locale(command, "commands", locale=configGet("locale_fallback"))))
app.set_bot_commands(commands_list)
app.set_bot_commands([ # type: ignore
BotCommand("reboot", locale("reboot", "commands_admin", locale=configGet("locale"))),
],
scope=BotCommandScopeChat(chat_id=configGet("admin", "reports")))
idle()
app.send_message(configGet("admin", "reports"), locale("shutdown", "message", locale=configGet("locale")).format(str(pid))) # type: ignore
logWrite(locale("shutdown", "console", locale=configGet("locale")).format(str(pid)))
killProc(pid)