Files
QuizBot/cogs/cog_user.py

232 lines
7.7 KiB
Python

from datetime import datetime
from pathlib import Path
from typing import List, Dict, Any
from zoneinfo import ZoneInfo
from bson import ObjectId
from bson.errors import InvalidId
from discord import (
ApplicationContext,
SlashCommandGroup,
User,
option,
File,
TextChannel,
)
from discord.ext.commands import Cog
from libbot.i18n import _, in_every_locale
from classes import PycordUser, PycordEvent, PycordGuild
from classes.errors import GuildNotFoundError
from classes.pycord_bot import PycordBot
from modules.database import col_users
from modules.utils import is_operation_confirmed
class CogUser(Cog):
"""Cog with user management commands."""
def __init__(self, bot: PycordBot):
self.bot: PycordBot = bot
command_group: SlashCommandGroup = SlashCommandGroup(
"user",
description=_("description", "commands", "user"),
description_localizations=in_every_locale("description", "commands", "user"),
)
# TODO Implement the command
# @command_group.command(
# name="create_channel",
# description="Create channel for the user",
# )
# @option(
# "user",
# description="Selected user",
# )
# async def command_user_create_channel(self, ctx: ApplicationContext, user: User) -> None:
# await ctx.respond("Not implemented.")
# TODO Implement the command
@command_group.command(
name="update_channels",
description="Update user's event channels",
)
@option(
"user",
description="Selected user",
)
async def command_user_update_channels(self, ctx: ApplicationContext, user: User) -> None:
try:
guild: PycordGuild = await self.bot.find_guild(ctx.guild.id)
except (InvalidId, GuildNotFoundError):
await ctx.respond(self.bot._("unexpected_error", "messages", locale=ctx.locale))
return
pycord_user: PycordUser = await self.bot.find_user(user.id, ctx.guild.id)
events: List[PycordEvent] = []
pipeline: List[Dict[str, Any]] = [
{"$match": {"id": pycord_user.id}},
{
"$lookup": {
"from": "events",
"localField": "registered_event_ids",
"foreignField": "_id",
"as": "registered_events",
}
},
{
"$match": {
"registered_events.ended": None,
"registered_events.ends": {"$gt": datetime.now(tz=ZoneInfo("UTC"))},
"registered_events.starts": {"$lt": datetime.now(tz=ZoneInfo("UTC"))},
"registered_events.is_cancelled": False,
}
},
]
async for result in col_users.aggregate(pipeline):
for registered_event in result["registered_events"]:
events.append(PycordEvent(**registered_event))
for event in events:
if pycord_user.current_event_id is not None and pycord_user.current_event_id != event._id:
continue
if pycord_user.current_event_id is None:
await pycord_user.set_event(event._id, cache=self.bot.cache)
channel: TextChannel | None = await pycord_user.fix_event_channel(
self.bot, ctx.guild, guild, event, cache=self.bot.cache
)
if channel is None:
continue
thumbnail: File | None = (
None
if event.thumbnail is None
else File(Path(f"data/{event.thumbnail['id']}"), event.thumbnail["filename"])
)
await channel.send(
self.bot._("notice_event_already_started", "messages").format(event_name=event.name),
file=thumbnail,
)
stage_id: ObjectId = (
event.stage_ids[0] if pycord_user.current_stage_id is None else pycord_user.current_stage_id
)
await pycord_user.set_event_stage(stage_id, cache=self.bot.cache)
await self.bot.send_stage_question(channel, event, await self.bot.find_event_stage(stage_id))
await ctx.respond("Channels were updated.")
# TODO Implement the command
# @command_group.command(
# name="delete_channel",
# description="Delete user's channel",
# )
# @option(
# "user",
# description="Selected user",
# )
# @option("confirm", description="Confirmation of the operation", required=False)
# async def command_user_delete_channel(
# self, ctx: ApplicationContext, user: User, confirm: bool = False
# ) -> None:
# await ctx.respond("Not implemented.")
@command_group.command(
name="jail",
description=_("description", "commands", "user_jail"),
description_localizations=in_every_locale("description", "commands", "user_jail"),
)
@option(
"user",
description=_("description", "commands", "user_jail", "options", "user"),
description_localizations=in_every_locale(
"description", "commands", "user_jail", "options", "user"
),
)
@option(
"confirm",
description=_("description", "commands", "user_jail", "options", "confirm"),
description_localizations=in_every_locale(
"description", "commands", "user_jail", "options", "confirm"
),
required=False,
)
async def command_user_jail(self, ctx: ApplicationContext, user: User, confirm: bool = False) -> None:
if not (await is_operation_confirmed(ctx, confirm)):
return
pycord_user: PycordUser = await self.bot.find_user(user, ctx.guild)
if pycord_user.is_jailed:
await ctx.respond(
self.bot._("user_jail_already_jailed", "messages", locale=ctx.locale).format(
display_name=user.display_name
),
ephemeral=True,
)
return
await pycord_user.jail(self.bot.cache)
await ctx.respond(
self.bot._("user_jail_successful", "messages", locale=ctx.locale).format(
display_name=user.display_name
)
)
@command_group.command(
name="unjail",
description=_("description", "commands", "user_unjail"),
description_localizations=in_every_locale("description", "commands", "user_unjail"),
)
@option(
"user",
description=_("description", "commands", "user_unjail", "options", "user"),
description_localizations=in_every_locale(
"description", "commands", "user_unjail", "options", "user"
),
)
@option(
"confirm",
description=_("description", "commands", "user_unjail", "options", "confirm"),
description_localizations=in_every_locale(
"description", "commands", "user_unjail", "options", "confirm"
),
required=False,
)
async def command_user_unjail(self, ctx: ApplicationContext, user: User, confirm: bool = False) -> None:
if not (await is_operation_confirmed(ctx, confirm)):
return
pycord_user: PycordUser = await self.bot.find_user(user, ctx.guild)
if not pycord_user.is_jailed:
await ctx.respond(
self.bot._("user_unjail_not_jailed", "messages", locale=ctx.locale).format(
display_name=user.display_name
),
ephemeral=True,
)
return
await pycord_user.unjail(self.bot.cache)
await ctx.respond(
self.bot._("user_unjail_successful", "messages", locale=ctx.locale).format(
display_name=user.display_name
)
)
def setup(bot: PycordBot) -> None:
bot.add_cog(CogUser(bot))