from csv import QUOTE_ALL from os import makedirs, path, remove from uuid import uuid1 import aiofiles from aiocsv.writers import AsyncDictWriter from pyrogram import filters from pyrogram.client import Client from pyrogram.enums.chat_action import ChatAction from pyrogram.types import Message from ujson import dumps from app import app from modules import custom_filters from modules.database import col_applications, col_sponsorships, col_warnings from modules.logging import logWrite from modules.utils import locale, should_quote @app.on_message( custom_filters.enabled_general & ~filters.scheduled & filters.command(["export"], prefixes=["/"]) & custom_filters.admin ) async def cmd_export(app: Client, msg: Message): if len(msg.command) <= 1: await msg.reply_text( locale("syntax_export", "message", locale=msg.from_user), quote=should_quote(msg), ) return selection = msg.command[1].lower() if selection not in ["applications", "warnings", "sponsorships", "bans", "event"]: await msg.reply_text( locale("syntax_export", "message", locale=msg.from_user), quote=should_quote(msg), ) return logWrite(f"Admin {msg.from_user.id} requested export of {selection}") makedirs("tmp", exist_ok=True) temp_file = path.join("tmp", str(uuid1())) await app.send_chat_action(msg.chat.id, ChatAction.TYPING) output_csv = [] output_json = [] if selection == "applications": header_csv = [ "user", "date", "admin", "question_1", "question_2", "question_3", "question_4", "question_5", "question_6", "question_7", "question_8", "question_9", "question_10", ] for entry in list(col_applications.find()): del entry["_id"] entry["date"] = entry["date"].isoformat() entry["application"]["2"] = entry["application"]["2"].isoformat() output_json.append(entry) for entry in list(col_applications.find()): del entry["_id"] entry["date"] = entry["date"].isoformat() entry["application"]["2"] = entry["application"]["2"].isoformat() for index, value in enumerate(entry["application"]): entry[f"question_{index+1}"] = entry["application"][value] entry[ "question_3" ] = f"{entry['application']['3']['name']} ({entry['application']['3']['adminName1']}, {entry['application']['3']['countryName']})" del entry["application"] output_csv.append(entry) elif selection == "warnings": header_csv = [ "id", "user", "admin", "date", "reason", "active", "revoke_date", ] for entry in list(col_warnings.find()): for k, v in list(entry.items()): entry[{"_id": "id"}.get(k, k)] = entry.pop(k) entry["id"] = str(entry["id"]) entry["date"] = entry["date"].isoformat() if entry["revoke_date"] is not None: entry["revoke_date"] = entry["revoke_date"].isoformat() output_json.append(entry) output_csv.append(entry) elif selection == "sponsorships": header_csv = [ "user", "date", "admin", "streamer", "expires", "proof", "label", ] for entry in list(col_sponsorships.find()): del entry["_id"] entry["date"] = entry["date"].isoformat() entry["sponsorship"]["expires"] = entry["sponsorship"][ "expires" ].isoformat() output_json.append(entry) for entry in list(col_sponsorships.find()): del entry["_id"] entry["date"] = entry["date"].isoformat() entry["sponsorship"]["expires"] = entry["sponsorship"][ "expires" ].isoformat() for index, value in enumerate(entry["sponsorship"]): entry[value] = entry["sponsorship"][value] del entry["sponsorship"] output_csv.append(entry) elif selection == "bans": header_csv = ["user", "admin", "date"] for entry in list(col_warnings.find()): del entry["id"] entry["date"] = entry["date"].isoformat() output_json.append(entry) output_csv.append(entry) elif selection == "event": header_csv = ["user", "stage", "date"] for entry in list(col_warnings.find()): del entry["id"] entry["date"] = entry["date"].isoformat() output_json.append(entry) output_csv.append(entry) # Saving CSV async with aiofiles.open(temp_file + ".csv", mode="w", encoding="utf-8") as file: writer = AsyncDictWriter(file, header_csv, restval="NULL", quoting=QUOTE_ALL) await writer.writeheader() await writer.writerows(output_csv) # Saving JSON async with aiofiles.open(temp_file + ".json", mode="w", encoding="utf-8") as file: await file.write( dumps( output_json, ensure_ascii=False, escape_forward_slashes=False, indent=4 ) ) # Sending CSV await app.send_chat_action(msg.chat.id, ChatAction.UPLOAD_DOCUMENT) await msg.reply_document( document=temp_file + ".csv", file_name=f"{selection}.csv", quote=should_quote(msg), ) # Sending JSON await app.send_chat_action(msg.chat.id, ChatAction.UPLOAD_DOCUMENT) await msg.reply_document( document=temp_file + ".json", file_name=f"{selection}.json", quote=should_quote(msg), ) del output_csv, output_json # Removing temp files remove(temp_file + ".csv") remove(temp_file + ".json")