WIP: Part 2

This commit is contained in:
Profitroll 2024-06-02 21:51:07 +02:00
parent 83ef964122
commit 7bed604b9d
Signed by: profitroll
GPG Key ID: FA35CAB49DACD3B2
23 changed files with 1372 additions and 133 deletions

View File

@ -1,2 +1,51 @@
# GloryBot <h1 align="center">GloryBot</h1>
<p align="center">
<a href="https://git.end-play.xyz/VA-11_Hall-A/GloryBot/src/branch/master/LICENSE"><img alt="License: GPL" src="https://img.shields.io/badge/License-GPL-blue"></a>
<a href="https://git.end-play.xyz/VA-11_Hall-A/GloryBot"><img alt="Code style: black" src="https://img.shields.io/badge/code%20style-black-000000.svg"></a>
</p>
Discord bot based on [Pycord](https://github.com/Pycord-Development/pycord) and [LibBotUniversal](https://git.end-play.xyz/profitroll/LibBotUniversal) that provides interesting captcha for your server.
## Requirements
* MongoDB
* Python 3.8+ (3.11 is recommended)
* Git (when installing from source)
## Installation
There are two kinds of installation supported. Stable one is basically a stable release that you can download and use. Upgrades will need some more complicated actions on your side. The source one provides you with easier code management, yet is meant to be used by more advanced users.
### Stable
1. Install all [requirements](#requirements) except for Git
2. Go to [Releases](/releases) and download the stable release of your choice
3. Unpack the archive somewhere you like (`/opt/GloryBot` on Linux or `C:\Program Files\GloryBot` on Windows will do)
4. Open the terminal in project's folder and create a virtual environment: `python -m venv .venv`
5. Activate the virtual environment: `.venv/bin/activate` on Linux and `.venv\Scripts\activate.bat` on Windows
6. Install all requirements: `pip install requirements.txt`
7. Create a config file by copying the `config_example.json` to `config.json`: `cp config_example.json config.json`
8. Configure the bot's `config.json` using your favorite text editor
9. Run the bot: `python main.py`
### From source
1. Install all [requirements](#requirements)
2. Go to the directory of your liking and open a terminal there: For example, `/opt` on Linux or `C:\Program Files` on Windows
3. Clone the repository: `git clone https://git.end-play.xyz/VA-11_Hall-A/GloryBot.git`
4. Change directory to the cloned project: `cd GloryBot`
5. Open the terminal in project's folder and create a virtual environment: `python -m venv .venv`
6. Activate the virtual environment: `.venv/bin/activate` on Linux and `.venv\Scripts\activate.bat` on Windows
7. Install all requirements: `pip install requirements.txt`
8. Create a config file by copying the `config_example.json` to `config.json`: `cp config_example.json config.json`
9. Configure the bot's `config.json` using your favorite text editor
10. Run the bot: `python main.py`
## Upgrading
To-Do
## Creating a service
To-Do

View File

@ -1 +1,11 @@
from .member import MemberNotFoundError from .check import (
CheckAlreadyAssignedError,
CheckChallengeNotFoundError,
CheckNotFoundError,
)
from .guild import (
GuildAlreadyExistsError,
GuildChallengesEmptyError,
GuildNotFoundError,
)
from .member import MemberAlreadyExistsError, MemberNotFoundError

58
classes/errors/check.py Normal file
View File

@ -0,0 +1,58 @@
from bson import ObjectId
class CheckNotFoundError(Exception):
"""Exception raised when check does not exist in a database.
### Attributes:
* guild (`ObjectId`): Member's guild.
* member (`ObjectId`): Member ID.
"""
def __init__(self, guild: ObjectId, member: ObjectId):
self.guild: ObjectId = guild
self.member: ObjectId = member
super().__init__(
f"Could not find a check for member {str(member)} in the guild with id {str(guild)}."
)
def __str__(self):
return f"Could not find a check for member {str(self.member)} in the guild with id {str(self.guild)}."
class CheckAlreadyAssignedError(Exception):
"""Exception raised when a member already has an active check on their record.
### Attributes:
* guild (`ObjectId`): Member's guild.
* member (`ObjectId`): Member ID.
"""
def __init__(self, guild: ObjectId, member: ObjectId):
self.guild: ObjectId = guild
self.member: ObjectId = member
super().__init__(
f"Member {str(member)} in the guild with id {str(guild)} already has an active check on record."
)
def __str__(self):
return f"Member {str(self.member)} in the guild with id {str(self.guild)} already has an active check on record."
class CheckChallengeNotFoundError(Exception):
"""Exception raised when challenge of the check cannot be found in a database.
### Attributes:
* check (`ObjectId`): Check's ID.
* challenge (`ObjectId`): Challenge's ID.
"""
def __init__(self, check: ObjectId, challenge: ObjectId):
self.check: ObjectId = check
self.challenge: ObjectId = challenge
super().__init__(
f"Challenge {str(challenge)} of the check {str(check)} could not be found."
)
def __str__(self):
return f"Challenge {str(self.challenge)} of the check {str(self.check)} could not be found."

48
classes/errors/guild.py Normal file
View File

@ -0,0 +1,48 @@
from typing import Union
from bson import ObjectId
class GuildNotFoundError(Exception):
"""Exception raised when guild does not exist in a database.
### Attributes:
* id (`Union[int, ObjectId]`): Guild ID.
"""
def __init__(self, id: Union[int, ObjectId]):
self.id: Union[int, ObjectId] = id
super().__init__(f"Could not find guild entry for {str(id)}.")
def __str__(self):
return f"Could not find guild entry for {str(self.id)}."
class GuildAlreadyExistsError(Exception):
"""Exception raised when guild already exists in a database.
### Attributes:
* id (`Union[int, ObjectId]`): Guild ID.
"""
def __init__(self, id: Union[int, ObjectId]):
self.id: Union[int, ObjectId] = id
super().__init__(f"There already is a database entry for {str(id)}.")
def __str__(self):
return f"There already is a database entry for {str(self.id)}."
class GuildChallengesEmptyError(Exception):
"""Exception raised when guild has no active challenges.
### Attributes:
* id (`ObjectId`): Guild ID.
"""
def __init__(self, id: ObjectId):
self.id: ObjectId = id
super().__init__(f"Guild {str(id)} has no active challenges.")
def __str__(self):
return f"Guild {str(self.id)} has no active challenges."

View File

@ -1,4 +1,5 @@
from typing import Union from typing import Union
from bson import ObjectId from bson import ObjectId
@ -6,16 +7,35 @@ class MemberNotFoundError(Exception):
"""Exception raised when member does not exist in a database. """Exception raised when member does not exist in a database.
### Attributes: ### Attributes:
* id: Member ID. * id (`Union[int, ObjectId]`): Member ID.
* guild: Member's guild. * guild (`ObjectId`): Member's guild.
""" """
def __init__(self, id: Union[int, ObjectId], guild: ObjectId): def __init__(self, id: Union[int, ObjectId], guild: ObjectId):
self.id = id self.id: Union[int, ObjectId] = id
self.guild = guild self.guild: ObjectId = guild
super().__init__( super().__init__(
f"Could not find member entry for {str(id)} in the guild with id {str(guild)}." f"Could not find member entry for {str(id)} in the guild with id {str(guild)}."
) )
def __str__(self): def __str__(self):
return f"Could not find member entry for {str(self.id)} in the guild with id {str(self.guild)}." return f"Could not find member entry for {str(self.id)} in the guild with id {str(self.guild)}."
class MemberAlreadyExistsError(Exception):
"""Exception raised when attempted member creation when it already exists in a database.
### Attributes:
* id (`Union[int, ObjectId]`): Member ID.
* guild (`ObjectId`): Member's guild.
"""
def __init__(self, id: Union[int, ObjectId], guild: ObjectId):
self.id: Union[int, ObjectId] = id
self.guild: ObjectId = guild
super().__init__(
f"Member entry for {str(id)} in the guild with id {str(guild)} already exists."
)
def __str__(self):
return f"Member entry for {str(self.id)} in the guild with id {str(self.guild)} already exists."

84
classes/pycord_bot.py Normal file
View File

@ -0,0 +1,84 @@
from typing import Any, Union
from aiohttp import ClientSession
from bson import ObjectId
from libbot.pycord.classes import PycordBot as LibPycordBot
from classes.enums import MemberStatus
from classes.errors import GuildNotFoundError, MemberNotFoundError
from classes.pycord_guild import PycordGuild
from classes.pycord_member import PycordMember
class PycordBot(LibPycordBot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client_session = ClientSession()
if self.scheduler is None:
return
async def find_guild(
self,
id: Union[int, ObjectId],
create: bool = False,
) -> PycordGuild:
"""Find guild by their ID.
### Args:
* id (`Union[int, ObjectId]`): Guild's ID
* create (`bool`): Create DB entry if does not exist. Defaults to `False`.
### Raises:
* `GuildNotFoundError`: Raised when guild entry could not be found.
### Returns:
* `PycordGuild`: Guild in database representation.
"""
try:
guild = await PycordGuild.find(id)
except GuildNotFoundError as exc:
if create and isinstance(id, int):
return await PycordGuild.create(id)
else:
raise exc from exc
return guild
async def find_member(
self,
id: Union[int, ObjectId],
guild: ObjectId,
create: bool = False,
) -> PycordMember:
"""Find member by their ID and guild.
### Args:
* id (`Union[int, ObjectId]`): Member's ID
* guild (`ObjectId`): Discord guild's database ID
* create (`bool`): Create DB entry if does not exist. Defaults to `False`.
### Raises:
* `MemberNotFoundError`: Raised when member entry could not be found.
### Returns:
* `PycordMember`: Member in database representation.
"""
try:
member = await PycordMember.find(id, guild)
except MemberNotFoundError as exc:
if create and isinstance(id, int):
return await PycordMember.create(id, guild, MemberStatus.UNVERIFIED)
else:
raise exc from exc
return member
async def close(self, *args: Any, **kwargs: Any) -> None:
await self.client_session.close()
if self.scheduler is not None:
self.scheduler.shutdown()
await super().close(*args, **kwargs)

146
classes/pycord_challenge.py Normal file
View File

@ -0,0 +1,146 @@
import logging
from dataclasses import dataclass
from re import IGNORECASE, Match, compile
from typing import List, Union
from bson import ObjectId, Regex
from modules.database import col_challenges
logger = logging.getLogger(__name__)
@dataclass
class PycordChallenge:
"""Dataclass of DB entry of a captcha challenge"""
__slots__ = (
"_id",
"guild",
"enabled",
"archived",
"challenge",
"answers",
)
_id: ObjectId
guild: ObjectId
enabled: bool
archived: bool
challenge: str
answers: List[Regex]
@classmethod
async def create(
cls,
guild: ObjectId,
challenge: str,
answers: List[Regex],
enabled: bool = True,
archived: bool = False,
):
"""Create new challenge entry in a database.
### Args:
* guild (`ObjectId`): Guild's ID
* challenge (`str`): Challenge text
* answers (`List[Regex]`): List of regex patterns that are answers to the challenge
* enabled (`bool`, *optional*): Whether the challenge is enabled. Defaults to `True`.
* archived (`bool`, *optional*): Whether the challenge is archived. Defaults to `False`.
### Returns:
* `PycordChallenge`: The challenge object
"""
challenge_entry = {
"guild": guild,
"enabled": enabled,
"archived": archived,
"challenge": challenge,
"answers": answers,
}
inserted = await col_challenges.insert_one(challenge_entry)
challenge_entry["_id"] = inserted.inserted_id
return cls(**challenge_entry)
def solve(self, expression: str) -> bool:
"""Check if the expression is an answer to the challenge
### Args:
* expression (`str`): Some expression to be matched against the challenge's answers.
### Returns:
* `bool`: `True` if the answer is correct and `False` if not.
"""
return bool(self.match(expression))
def match(self, expression: str) -> Union[Match, None]:
"""Match the expression against challenge's answers
### Args:
* expression (`str`): Some expression to be matched against the challenge's answers.
### Returns:
* `Union[Match, None]`: Regex `Match` if a match is found, otherwise `None`.
"""
for answer in self.answers:
if re_match := compile(answer.pattern, IGNORECASE).match(
expression.strip()
):
return re_match
return None
async def enable(self, unarchive: bool = True) -> None:
"""Enable and unarchive the challenge
### Args:
* unarchive (`bool`, *optional*): Whether to unarchive the challenge as well. Defaults to `True`.
"""
self.enabled = True
self.archived = unarchive if self.archived else False
await col_challenges.update_one(
{"_id": self._id},
{
"$set": {
"enabled": True,
"archived": unarchive if self.archived else False,
}
},
)
async def disable(self) -> None:
"""Disable the challenge"""
self.enabled = False
await col_challenges.update_one({"_id": self._id}, {"$set": {"enabled": False}})
async def archive(self, disable: bool = True) -> None:
"""Archive and disable the challenge
### Args:
* disable (`bool`, *optional*): Whether to disable the challenge as well. Defaults to `True`.
"""
self.enabled = not disable if self.enabled else False
self.archived = True
await col_challenges.update_one(
{"_id": self._id},
{
"$set": {
"enabled": not disable if self.enabled else False,
"archived": True,
}
},
)
async def unarchive(self) -> None:
"""Unarchive the challenge"""
self.archived = False
await col_challenges.update_one(
{"_id": self._id}, {"$set": {"archived": False}}
)

125
classes/pycord_check.py Normal file
View File

@ -0,0 +1,125 @@
import logging
from dataclasses import dataclass
from datetime import UTC, datetime
from random import choice
from typing import Optional
from bson import ObjectId
from classes.errors import (
CheckAlreadyAssignedError,
CheckChallengeNotFoundError,
CheckNotFoundError,
GuildChallengesEmptyError,
)
from classes.pycord_challenge import PycordChallenge
from modules.database import col_challenges, col_checks
logger = logging.getLogger(__name__)
@dataclass
class PycordCheck:
"""Dataclass of DB entry of a security check"""
__slots__ = (
"_id",
"guild",
"active",
"thread_id",
"member",
"date_created",
"date_modified",
"challenge",
)
_id: ObjectId
guild: ObjectId
active: bool
thread_id: int
member: ObjectId
date_created: datetime
date_modified: datetime
challenge: ObjectId
@classmethod
async def find(
cls,
guild: ObjectId,
member: ObjectId,
):
db_entry = await col_checks.find_one(
{"active": True, "guild": guild, "member": member}
)
if not db_entry:
raise CheckNotFoundError(guild, member)
return cls(**db_entry)
@classmethod
async def create(
cls,
guild: ObjectId,
thread_id: int,
member: ObjectId,
challenge: Optional[ObjectId] = None,
):
# Check whether active check already exists
if await col_checks.find_one(
{"active": True, "guild": guild, "member": member}
):
raise CheckAlreadyAssignedError(guild, member)
# Get all enabled guild challenges
guild_challenges = [
challenge
async for challenge in col_challenges.find(
{"guild": guild, "enabled": True, "archived": False}
)
]
# Check whether guild has challenges at all
if not challenge and not guild_challenges:
raise GuildChallengesEmptyError(guild)
# Create a check dict
check = {
"guild": guild,
"active": True,
"thread_id": thread_id,
"member": member,
"date_created": datetime.now(UTC),
"date_modified": datetime.now(UTC),
"challenge": challenge or choice(guild_challenges)["_id"],
}
# Insert the check into the database
inserted = await col_checks.insert_one(check)
check["_id"] = inserted.inserted_id
return cls(**check)
async def deactivate(self) -> None:
"""Deactivate the check"""
await col_checks.update_one(
{"_id": self._id},
{"$set": {"active": False, "date_modified": datetime.now(UTC)}},
)
async def get_challenge(self) -> PycordChallenge:
"""Get check's bound challenge
### Raises:
* `CheckChallengeNotFoundError`: Challenge could not be found
### Returns:
* `PycordChallenge`: Captcha challenge
"""
challenge = await col_challenges.find_one({"_id": self.challenge})
if not challenge:
raise CheckChallengeNotFoundError(self._id, self.challenge)
return PycordChallenge(**challenge)

93
classes/pycord_guild.py Normal file
View File

@ -0,0 +1,93 @@
import logging
from dataclasses import dataclass
from typing import List, Union
from bson import ObjectId, Regex
from classes.errors import GuildAlreadyExistsError, GuildNotFoundError
from classes.pycord_challenge import PycordChallenge
from classes.pycord_guild_channels import PycordGuildChannels
from classes.pycord_guild_roles import PycordGuildRoles
from modules.database import col_guilds
logger = logging.getLogger(__name__)
@dataclass
class PycordGuild:
"""Dataclass of DB entry of a member"""
__slots__ = ("_id", "id", "channels", "roles")
_id: ObjectId
id: int
channels: PycordGuildChannels
roles: PycordGuildRoles
@classmethod
async def find(cls, id: Union[int, ObjectId]):
"""Find guild in the database.
### Args:
* id (`Union[int, ObjectId]`): Guild's Discord ID
### Raises:
* `GuildNotFoundError`: Raised when guild entry could not be found.
### Returns:
* `PycordGuild`: Guild with its database data.
"""
db_entry = await col_guilds.find_one(
({"id": id} if isinstance(id, int) else {"_id": id})
)
if not db_entry:
raise GuildNotFoundError(id)
db_entry["channels"] = PycordGuildChannels(**db_entry["channels"])
db_entry["roles"] = PycordGuildRoles(**db_entry["roles"])
return cls(**db_entry)
@classmethod
async def create(
cls,
id: int,
channels: PycordGuildChannels = PycordGuildChannels(),
roles: PycordGuildRoles = PycordGuildRoles(),
):
if await col_guilds.find_one({"id": id}):
raise GuildAlreadyExistsError(id)
guild = {"id": id, "channels": channels.dict(), "roles": roles.dict()}
inserted = await col_guilds.insert_one(guild)
guild["_id"] = inserted.inserted_id
return cls(**guild)
def is_valid(self) -> bool:
"""Check if all attributes are valid and return boolean of that.
### Returns:
* `bool`: `True` if all attributes are valid and `False` if not
"""
return bool(self.id and self.channels.is_valid() and self.roles.is_valid())
async def add_challenge(
self, challenge: str, answers: List[Regex], enabled: bool
) -> PycordChallenge:
"""Create new guild's challenge entry in a database.
### Args:
* challenge (`str`): Challenge text
* answers (`List[Regex]`): List of regex patterns that are answers to the challenge
* enabled (`bool`, *optional*): Whether the challenge is enabled. Defaults to `True`.
### Returns:
* `PycordChallenge`: The challenge object
"""
return await PycordChallenge.create(
self._id, challenge, answers, enabled=enabled
)

View File

@ -0,0 +1,31 @@
from dataclasses import asdict, dataclass
from typing import Optional, Union
@dataclass
class PycordGuildChannels:
__slots__ = ("captcha", "logging", "welcome")
captcha: Union[int, None]
logging: Union[int, None]
welcome: Union[int, None]
def __init__(
self,
captcha: Optional[int] = None,
logging: Optional[int] = None,
welcome: Optional[int] = None,
) -> None:
self.captcha = captcha
self.logging = logging
self.welcome = welcome
def dict(self):
return {key: value for key, value in asdict(self).items()}
def is_valid(self) -> bool:
"""Check if all attributes are not `None` and return boolean of that.
### Returns:
* `bool`: `True` if all attributes are there and `False` if not
"""
return bool(self.captcha and self.logging and self.welcome)

View File

@ -0,0 +1,42 @@
from dataclasses import asdict, dataclass
from typing import Optional, Union
@dataclass
class PycordGuildRoles:
__slots__ = (
"moderator",
"verified",
"failed",
"additional",
)
moderator: Union[int, None]
verified: Union[int, None]
failed: Union[int, None]
additional: Union[int, None]
def __init__(
self,
moderator: Optional[int] = None,
verified: Optional[int] = None,
failed: Optional[int] = None,
additional: Optional[int] = None,
) -> None:
self.moderator = moderator
self.verified = verified
self.failed = failed
self.additional = additional
def dict(self):
return {key: value for key, value in asdict(self).items()}
def is_valid(self) -> bool:
"""Check if all attributes are not `None` and return boolean of that.
### Returns:
* `bool`: `True` if all attributes are there and `False` if not
"""
return bool(
self.moderator and self.verified and self.failed and self.additional
)

View File

@ -6,8 +6,8 @@ from bson import ObjectId
from discord import Thread from discord import Thread
from classes.enums import MemberStatus from classes.enums import MemberStatus
from classes.errors import MemberNotFoundError from classes.errors import MemberAlreadyExistsError, MemberNotFoundError
from classes.pycordcheck import PycordCheck from classes.pycord_check import PycordCheck
from modules.database import col_members from modules.database import col_members
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -31,14 +31,14 @@ class PycordMember:
@classmethod @classmethod
async def find(cls, id: Union[int, ObjectId], guild: ObjectId): async def find(cls, id: Union[int, ObjectId], guild: ObjectId):
"""Find a member in a database. """Find member in the database.
### Args: ### Args:
* id (`Union[int, ObjectId]`): Member's Discord ID * id (`Union[int, ObjectId]`): Member's Discord ID
* guild (`ObjectId`): Discord guild's database ID * guild (`ObjectId`): Discord guild's database ID
### Raises: ### Raises:
* `MemberNotFoundError`: Raised when member entry after insertion could not be found. * `MemberNotFoundError`: Raised when member entry could not be found.
### Returns: ### Returns:
* `PycordMember`: Member with its database data. * `PycordMember`: Member with its database data.
@ -51,7 +51,7 @@ class PycordMember:
) )
) )
if db_entry is None: if not db_entry:
raise MemberNotFoundError(id, guild) raise MemberNotFoundError(id, guild)
return cls(**db_entry) return cls(**db_entry)
@ -68,10 +68,37 @@ class PycordMember:
MemberStatus.ADDITIONAL, MemberStatus.ADDITIONAL,
], ],
): ):
pass # Check whether member already exists
if await col_members.find_one({"id": True, "guild": guild}):
raise MemberAlreadyExistsError(id, guild)
async def get_check(self, guild: ObjectId) -> PycordCheck: # Create a member dict
pass check = {
"id": id,
"guild": guild,
"status": status.value,
}
# Insert the member into the database
inserted = await col_members.insert_one(check)
check["_id"] = inserted.inserted_id
return cls(**check)
async def new_check(self, thread_id: int) -> PycordCheck:
return await PycordCheck.create(self.guild, thread_id, self._id)
async def get_check(self) -> PycordCheck:
"""Get an ongoing check
### Returns:
* `PycordCheck`: An ongoing check
### Raises:
* `CheckNotFoundError`: Member does not have an ongoing check
"""
return await PycordCheck.find(self.guild, self._id)
async def set_status( async def set_status(
self, self,
@ -82,4 +109,5 @@ class PycordMember:
MemberStatus.ADDITIONAL, MemberStatus.ADDITIONAL,
], ],
) -> Union[Thread, None]: ) -> Union[Thread, None]:
pass await col_members.update_one({"_id": self._id}, {"$set": {status.value}})
self.status = status

View File

@ -1,43 +0,0 @@
from typing import Any, Union
from aiohttp import ClientSession
from bson import ObjectId
from libbot.pycord.classes import PycordBot as LibPycordBot
from classes.pycordmember import PycordMember
class PycordBot(LibPycordBot):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.client_session = ClientSession()
if self.scheduler is None:
return
async def find_user(
self, id: Union[int, ObjectId], guild: ObjectId
) -> PycordMember:
"""Find member by their ID and guild.
### Args:
* id (`Union[int, ObjectId]`): Member's Discord ID
* guild (`ObjectId`): Discord guild's database ID
### Raises:
* `MemberNotFoundError`: Raised when member entry after insertion could not be found.
### Returns:
* `PycordMember`: Member in database representation.
"""
return await PycordMember.find(id, guild)
async def close(self, *args: Any, **kwargs: Any) -> None:
await self.client_session.close()
if self.scheduler is not None:
self.scheduler.shutdown()
await super().close(*args, **kwargs)

View File

@ -1,36 +0,0 @@
from datetime import datetime
import logging
from dataclasses import dataclass
from typing import List
from bson import ObjectId
from bson.regex import Regex
from modules.database import col_checks
logger = logging.getLogger(__name__)
@dataclass
class PycordCheck:
"""Dataclass of DB entry of a security check"""
__slots__ = (
"_id",
"guild",
"thread_id",
"member",
"date_created",
"date_modified",
"challenge",
"answers",
)
_id: ObjectId
guild: ObjectId
thread_id: int
member: ObjectId
date_created: datetime
date_modified: datetime
challenge: str
answers: List[Regex]

View File

@ -1,9 +0,0 @@
from dataclasses import dataclass
@dataclass
class PycordGuildColors:
default: str
success: str
warning: str
error: str

164
cogs/cog_member_join.py Normal file
View File

@ -0,0 +1,164 @@
import logging
from typing import Union
from discord import Cog, Member, TextChannel, Thread
from discord.abc import GuildChannel
from classes.errors import (
CheckAlreadyAssignedError,
CheckNotFoundError,
GuildChallengesEmptyError,
GuildNotFoundError,
)
from classes.pycord_bot import PycordBot
from classes.pycord_challenge import PycordChallenge
from classes.pycord_check import PycordCheck
from classes.pycord_guild import PycordGuild
from classes.pycord_member import PycordMember
logger = logging.getLogger(__name__)
class CogMemberJoin(Cog):
def __init__(self, client: PycordBot) -> None:
super().__init__()
self.client: PycordBot = client
@Cog.listener()
async def on_member_join(self, member: Member):
"""Process the event of new or returning member joining the guild.
This handler must take care of the new users, returning verified as well as returning banned users.
Guild must exist and be set up correctly in order to use this handler and process the input.
### Args:
* member (`Member`): Member that has joined the guild
"""
# Check whether the guild exists in the database
# We should not create it here when it does not exist. Config will be empty
try:
pycord_guild: PycordGuild = await self.client.find_guild(member.guild.id)
except GuildNotFoundError:
# Let's notify guild admins about this
logger.error(
"Guild %s is not set up properly: guild is missing from the database.",
member.guild.id,
)
return
# Get the member and create if it does not exist
pycord_member: PycordMember = await self.client.find_member(
member.id, pycord_guild._id, create=True
)
# Check whether guild is set up properly
if not pycord_guild.is_valid() or pycord_guild.channels.captcha is None:
# Let's notify guild admins about this
logger.error(
"Guild %s is not set up properly: check %s in database for details.",
member.guild.id,
pycord_guild._id,
)
return
# Check whether member already has an active check
try:
await pycord_member.get_check()
# Let's notify guild admins and user about this
logger.error(
"Member %s of guild %s could not get a check: user already has an active check.",
member.id,
member.guild.id,
)
return
except CheckNotFoundError:
logger.info(
"Member %s of guild %s does bot have a check yet.",
member.id,
member.guild.id,
)
captcha_channel: Union[GuildChannel, None] = member.guild.get_channel(
pycord_guild.channels.captcha
)
# Check whether the channel exists and is a text channel
if captcha_channel is None or not isinstance(captcha_channel, TextChannel):
logger.error(
"Captcha channel of the guild %s either does not exist or is incorrect."
)
return
# Try creating a thread
try:
captcha_thread: Thread = await captcha_channel.create_thread(
name=f"Verification - @{member.name}",
invitable=False,
reason=f"Verification - @{member.name} ({member.id})",
)
except Exception as exc:
logger.error(
"Could not create captcha thread for the channel %s in guild %s (%s) due to %s",
captcha_channel.id,
pycord_guild.id,
pycord_guild._id,
exc,
)
return
try:
check: PycordCheck = await pycord_member.new_check(captcha_thread.id)
challenge: PycordChallenge = await check.get_challenge()
except GuildChallengesEmptyError:
logger.error(
"Guild %s (%s) has no active challenges, check is aborted.",
pycord_guild.id,
pycord_guild._id,
)
await captcha_thread.delete()
return
except CheckAlreadyAssignedError:
logger.error(
"Member %s of the guild %s (%s) already has an active check, check is aborted.",
member.id,
pycord_guild.id,
pycord_guild._id,
)
await captcha_thread.delete()
return
try:
await captcha_thread.send(
f"Welcome, {member.mention}!\n\nTo keep this Discord server safe, we need to verify that you are a human.\n\nPlease, complete the sentence below:\n`{challenge.challenge} ...`\n\nUse the command `/verify` and the verification answer as its argument to pass the check."
)
except Exception as exc:
logger.error(
"Could not send the challenge to the thread %s in guild %s (%s) due to %s",
captcha_thread.id,
pycord_guild.id,
pycord_guild._id,
exc,
)
return
try:
await captcha_channel.send(
f"Welcome, {member.mention}! Please proceed to {captcha_thread.mention} in order to complete the verification.",
delete_after=180,
)
except Exception as exc:
logger.error(
"Could not send a notification about the challenge to the channel %s in guild %s (%s) due to %s",
captcha_channel.id,
pycord_guild.id,
pycord_guild._id,
exc,
)
return
def setup(client: PycordBot):
client.add_cog(CogMemberJoin(client))

52
cogs/cog_member_remove.py Normal file
View File

@ -0,0 +1,52 @@
import logging
from discord import Cog, Member
from classes.enums import MemberStatus
from classes.errors import GuildNotFoundError, MemberNotFoundError
from classes.pycord_bot import PycordBot
from classes.pycord_guild import PycordGuild
from classes.pycord_member import PycordMember
logger = logging.getLogger(__name__)
class CogMemberRemove(Cog):
def __init__(self, client: PycordBot) -> None:
super().__init__()
self.client = client
@Cog.listener()
async def on_member_remove(self, member: Member):
# Check whether the guild exists in the database
# We should not create it here when it does not exist. Config will be empty
try:
pycord_guild: PycordGuild = await self.client.find_guild(member.guild.id)
except GuildNotFoundError:
# Let's notify guild admins about this
logger.error(
"Guild %s is not set up properly: guild is missing from the database.",
member.guild.id,
)
return
# Get the member
try:
pycord_member: PycordMember = await self.client.find_member(
member.id, pycord_guild._id
)
except MemberNotFoundError:
return
# Verify whether a member has an active check
if check := await pycord_member.get_check():
# Update member's status
await pycord_member.set_status(MemberStatus.FAILED)
# Delete the thread (if possible)
if thread := member.guild.get_thread(check.thread_id):
await thread.delete()
def setup(client: PycordBot):
client.add_cog(CogMemberRemove(client))

152
cogs/cog_verify.py Normal file
View File

@ -0,0 +1,152 @@
import logging
from discord import ApplicationContext, Cog, option
from discord.ext import commands
from classes.enums import MemberStatus
from classes.errors import (
CheckChallengeNotFoundError,
CheckNotFoundError,
GuildNotFoundError,
MemberNotFoundError,
)
from classes.pycord_bot import PycordBot
logger = logging.getLogger(__name__)
class CogVerifySetup(Cog):
def __init__(self, client: PycordBot) -> None:
super().__init__()
self.client = client
@commands.slash_command(
name="verify", description="Submit verification check answer"
)
@option("answer", description="Answer to the challenge")
async def command_verify(self, ctx: ApplicationContext, answer: str):
# Verify that this command has been called in a guild
if ctx.guild is None:
await ctx.respond(
"This command can only be used in a guild.", ephemeral=True
)
return
# Verify that the guild exists and is configured
try:
pycord_guild = await self.client.find_guild(ctx.guild.id)
if not pycord_guild.is_valid():
raise GuildNotFoundError(pycord_guild.id)
except GuildNotFoundError:
await ctx.respond(
"This guild has not been set up for checks.", ephemeral=True
)
return
# Verify that the member is already known
try:
pycord_member = await self.client.find_member(ctx.user.id, pycord_guild._id)
except MemberNotFoundError:
await ctx.respond("There's no check on your record.", ephemeral=True)
return
# Verify that the member has an active check
try:
check = await pycord_member.get_check()
except CheckNotFoundError:
await ctx.respond("You do not have an active check.", ephemeral=True)
return
# Verify that the check has a valid challenge
try:
challenge = await check.get_challenge()
except CheckChallengeNotFoundError:
await ctx.respond(
"Your check does not have a valid channel.", ephemeral=True
)
return
# If the answer provided is incorrect
if not challenge.solve(answer):
logger.info(
"Member %s of guild %s has failed a check %s (challenge %s) with answer '%s'",
pycord_member.id,
pycord_guild.id,
check._id,
challenge._id,
answer,
)
# Update member's status
await pycord_member.set_status(MemberStatus.ADDITIONAL)
await ctx.respond("Your answer to the challenge is incorrect.")
# Assign the member an additional verification role
try:
member = ctx.guild.get_member(ctx.user.id)
additional_role = ctx.guild.get_role(pycord_guild.roles.additional) # type: ignore
await member.add_roles(additional_role)
except Exception as exc:
logger.error(
"Could not give role %s to %s of guild %s after passing the check due to %s",
pycord_guild.roles.verified,
pycord_member.id,
pycord_guild.id,
exc,
)
moderator_role = ctx.guild.get_role(pycord_guild.roles.moderator) # type: ignore
# Notify moderators (if the thread is available)
if thread := ctx.guild.get_thread(check.thread_id):
await thread.send(f"{moderator_role.mention} ACTION REQUIRED")
return
logger.info(
"Member %s of guild %s has passed a check %s (challenge %s) with answer '%s'",
pycord_member.id,
pycord_guild.id,
check._id,
challenge._id,
answer,
)
# Deactivate the check and update member's status
await check.deactivate()
await pycord_member.set_status(MemberStatus.VERIFIED)
await ctx.respond("You have passed the challenge!")
# Assign the member a verified role
try:
member = ctx.guild.get_member(ctx.user.id)
verified_role = ctx.guild.get_role(pycord_guild.roles.verified) # type: ignore
await member.add_roles(verified_role)
except Exception as exc:
logger.error(
"Could not give role %s to %s of guild %s after passing the check due to %s",
pycord_guild.roles.verified,
pycord_member.id,
pycord_guild.id,
exc,
)
# Delete the thread (if possible)
if thread := ctx.guild.get_thread(check.thread_id):
try:
await thread.delete()
except Exception as exc:
logger.error(
"Could not delete thread %s after the check of %s in guild %s due to %s",
thread.id,
pycord_member.id,
pycord_guild.id,
exc,
)
def setup(client: PycordBot):
client.add_cog(CogVerifySetup(client))

View File

@ -5,7 +5,7 @@ from os import getpid
from discord import Intents from discord import Intents
from libbot import sync from libbot import sync
from classes.pycordbot import PycordBot from classes.pycord_bot import PycordBot
from modules.scheduler import scheduler from modules.scheduler import scheduler
logging.basicConfig( logging.basicConfig(
@ -18,7 +18,12 @@ logger = logging.getLogger(__name__)
async def main(): async def main():
bot = PycordBot(scheduler=scheduler, intents=Intents.all()) # Define the bot's intents
intents = Intents.default()
intents.members = True
# Create a bot connection object
bot = PycordBot(scheduler=scheduler, intents=intents)
bot.load_extension("cogs") bot.load_extension("cogs")

View File

@ -1,4 +1,3 @@
from modules.migrator import migrate_database from modules.migrator import migrate_database
migrate_database() migrate_database()

View File

@ -26,3 +26,4 @@ db: AsyncDatabase = db_client.get_database(name=db_config["name"])
col_guilds: AsyncCollection = db.get_collection("guilds") col_guilds: AsyncCollection = db.get_collection("guilds")
col_checks: AsyncCollection = db.get_collection("checks") col_checks: AsyncCollection = db.get_collection("checks")
col_members: AsyncCollection = db.get_collection("members") col_members: AsyncCollection = db.get_collection("members")
col_challenges: AsyncCollection = db.get_collection("challenges")

View File

@ -1,6 +1,6 @@
apscheduler~=3.10.4 apscheduler~=3.10.4
mongodb-migrations==1.3.0 async_pymongo==0.1.5
mongodb-migrations==1.3.1
pytz~=2024.1 pytz~=2024.1
--extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple --extra-index-url https://git.end-play.xyz/api/packages/profitroll/pypi/simple
async_pymongo==0.1.4 libbot[speed,pycord]==3.2.2
libbot[speed,pycord]==3.0.0

View File

@ -189,7 +189,8 @@
], ],
"relations": [ "relations": [
"0c5b44eb-1d1a-4c61-aa95-a5e4e71095f1", "0c5b44eb-1d1a-4c61-aa95-a5e4e71095f1",
"66940f28-fcde-4d2a-9012-ba7e6bd626e8" "66940f28-fcde-4d2a-9012-ba7e6bd626e8",
"ce5b0f4b-3630-4594-adde-66bcfefa429b"
], ],
"lines": [], "lines": [],
"keys": [ "keys": [
@ -220,6 +221,17 @@
"colid": "90dab62e-469d-4eaa-b69c-cbee4dcc46ca" "colid": "90dab62e-469d-4eaa-b69c-cbee4dcc46ca"
} }
] ]
},
{
"id": "d9263cdc-91da-487e-9eeb-43f4b78353a7",
"isPk": false,
"name": "guilds_ai_3",
"cols": [
{
"id": "911d57c1-15f4-4631-930d-7aaab2ce620f",
"colid": "90dab62e-469d-4eaa-b69c-cbee4dcc46ca"
}
]
} }
], ],
"indexes": [], "indexes": [],
@ -278,6 +290,22 @@
"any": "", "any": "",
"fk": true "fk": true
}, },
{
"id": "30eedfb7-f063-463e-a4fb-bd66ff0d93a6",
"name": "active",
"datatype": "bool",
"param": "",
"pk": false,
"nn": false,
"list": false,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
},
{ {
"id": "89ff6268-aaf0-4d94-8b23-af84af81d233", "id": "89ff6268-aaf0-4d94-8b23-af84af81d233",
"name": "thread_id", "name": "thread_id",
@ -346,7 +374,7 @@
{ {
"id": "d3b8a5d5-aecf-4c63-af11-2d3c6f1cdef3", "id": "d3b8a5d5-aecf-4c63-af11-2d3c6f1cdef3",
"name": "challenge", "name": "challenge",
"datatype": "string", "datatype": "objectId",
"param": "", "param": "",
"pk": false, "pk": false,
"nn": false, "nn": false,
@ -357,28 +385,14 @@
"validation": "", "validation": "",
"pattern": false, "pattern": false,
"estimatedSize": "", "estimatedSize": "",
"any": "" "any": "",
}, "fk": true
{
"id": "0f31f5de-1fb1-4635-ac20-b1dc5a75cff6",
"name": "answers",
"datatype": "regex",
"param": "",
"pk": false,
"nn": false,
"list": true,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
} }
], ],
"relations": [ "relations": [
"4fe09974-9056-4780-a40b-0b98427c9806", "4fe09974-9056-4780-a40b-0b98427c9806",
"0c5b44eb-1d1a-4c61-aa95-a5e4e71095f1" "0c5b44eb-1d1a-4c61-aa95-a5e4e71095f1",
"5935cefc-3c00-4333-9e26-595c7f160eab"
], ],
"lines": [], "lines": [],
"keys": [ "keys": [
@ -539,7 +553,7 @@
}, },
{ {
"id": "6e67129f-49ff-4851-b53d-bcad6694a5d3", "id": "6e67129f-49ff-4851-b53d-bcad6694a5d3",
"name": "captcha_verified", "name": "verified",
"datatype": "long", "datatype": "long",
"param": "", "param": "",
"pk": false, "pk": false,
@ -555,7 +569,7 @@
}, },
{ {
"id": "263e0911-d6ab-455f-829a-13402122f573", "id": "263e0911-d6ab-455f-829a-13402122f573",
"name": "captcha_failed", "name": "failed",
"datatype": "long", "datatype": "long",
"param": "", "param": "",
"pk": false, "pk": false,
@ -571,7 +585,7 @@
}, },
{ {
"id": "2789fc3e-d195-4a61-9ccd-5c3bbd1e10b9", "id": "2789fc3e-d195-4a61-9ccd-5c3bbd1e10b9",
"name": "captcha_additional", "name": "additional",
"datatype": "long", "datatype": "long",
"param": "", "param": "",
"pk": false, "pk": false,
@ -611,6 +625,145 @@
"max": "", "max": "",
"validation": "", "validation": "",
"capped": false "capped": false
},
"c630fc75-8913-4fe4-a6c3-5ffdbd43df2a": {
"id": "c630fc75-8913-4fe4-a6c3-5ffdbd43df2a",
"visible": true,
"name": "challenges",
"desc": "",
"estimatedSize": "",
"cols": [
{
"id": "b00292fe-2574-48cc-9c67-c0f56fa7d3be",
"name": "_id",
"datatype": "objectId",
"param": "",
"pk": true,
"nn": true,
"list": false,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
},
{
"id": "b313363d-4901-498d-9440-9c08d87cd6ba",
"name": "guild",
"datatype": "objectId",
"param": "",
"pk": false,
"nn": false,
"list": false,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": "",
"fk": true
},
{
"id": "18999c90-6e9a-480e-bd99-bfaacd87d7a4",
"name": "enabled",
"datatype": "bool",
"param": "",
"pk": false,
"nn": false,
"list": false,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
},
{
"id": "60dda701-bf62-44f8-a69a-2b2b37469501",
"name": "archived",
"datatype": "bool",
"param": "",
"pk": false,
"nn": false,
"list": false,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
},
{
"id": "bcef7db5-49aa-4235-aff5-89ad18c92c21",
"name": "challenge",
"datatype": "string",
"param": "",
"pk": false,
"nn": false,
"list": false,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
},
{
"id": "2c269422-2f36-4e53-915e-e19f9e86d6c8",
"name": "answers",
"datatype": "regex",
"param": "",
"pk": false,
"nn": false,
"list": true,
"comment": "",
"data": "",
"enum": "",
"validation": "",
"pattern": false,
"estimatedSize": "",
"any": ""
}
],
"relations": [
"ce5b0f4b-3630-4594-adde-66bcfefa429b",
"5935cefc-3c00-4333-9e26-595c7f160eab"
],
"lines": [],
"keys": [
{
"id": "88db2516-4e20-4d99-bbff-dd3289f8991e",
"name": "Primary",
"isPk": true,
"cols": [
{
"id": "a9c8a99f-25b0-4817-b990-40461660eeea",
"colid": "b00292fe-2574-48cc-9c67-c0f56fa7d3be"
}
]
}
],
"indexes": [],
"embeddable": false,
"generate": true,
"generateCustomCode": true,
"customCode": "",
"beforeScript": "",
"afterScript": "",
"validationLevel": "na",
"validationAction": "na",
"collation": "",
"others": "",
"size": "",
"max": "",
"validation": "",
"capped": false
} }
}, },
"relations": { "relations": {
@ -694,6 +847,60 @@
"generateCustomCode": true, "generateCustomCode": true,
"customCode": "", "customCode": "",
"relationColor": "transparent" "relationColor": "transparent"
},
"ce5b0f4b-3630-4594-adde-66bcfefa429b": {
"id": "ce5b0f4b-3630-4594-adde-66bcfefa429b",
"visible": true,
"name": "_id_guild",
"desc": "",
"type": "identifying",
"parent_key": "d9263cdc-91da-487e-9eeb-43f4b78353a7",
"parent": "50052cf6-a0b4-4480-b01f-d9067db1442d",
"child": "c630fc75-8913-4fe4-a6c3-5ffdbd43df2a",
"c_mp": "true",
"c_mch": "true",
"c_p": "one",
"c_ch": "many",
"c_cp": "",
"c_cch": "",
"cols": [
{
"id": "fbc7b324-be0c-4c51-b9fa-739929bd3a03",
"parentcol": "90dab62e-469d-4eaa-b69c-cbee4dcc46ca",
"childcol": "b313363d-4901-498d-9440-9c08d87cd6ba"
}
],
"generate": true,
"generateCustomCode": true,
"customCode": "",
"relationColor": "transparent"
},
"5935cefc-3c00-4333-9e26-595c7f160eab": {
"id": "5935cefc-3c00-4333-9e26-595c7f160eab",
"visible": true,
"name": "_id_challenge",
"desc": "",
"type": "identifying",
"parent_key": "88db2516-4e20-4d99-bbff-dd3289f8991e",
"parent": "c630fc75-8913-4fe4-a6c3-5ffdbd43df2a",
"child": "6e055f35-bf3f-4561-8a6c-e0420c36e4ef",
"c_mp": "true",
"c_mch": "true",
"c_p": "one",
"c_ch": "many",
"c_cp": "",
"c_cch": "",
"cols": [
{
"id": "efd56c26-bf26-4a41-9768-42a0017945d1",
"parentcol": "b00292fe-2574-48cc-9c67-c0f56fa7d3be",
"childcol": "d3b8a5d5-aecf-4c63-af11-2d3c6f1cdef3"
}
],
"generate": true,
"generateCustomCode": true,
"customCode": "",
"relationColor": "transparent"
} }
}, },
"notes": {}, "notes": {},
@ -732,7 +939,7 @@
"def_collation": "", "def_collation": "",
"def_others": "", "def_others": "",
"connectionVersion": "", "connectionVersion": "",
"lastSaved": 1714494820523 "lastSaved": 1714861487360
}, },
"otherObjects": {}, "otherObjects": {},
"diagrams": { "diagrams": {
@ -812,6 +1019,19 @@
"autoExpand": true, "autoExpand": true,
"backgroundOpacity": "10", "backgroundOpacity": "10",
"collapsed": false "collapsed": false
},
"c630fc75-8913-4fe4-a6c3-5ffdbd43df2a": {
"referencedItemId": "c630fc75-8913-4fe4-a6c3-5ffdbd43df2a",
"x": 622,
"y": 286,
"gHeight": 135,
"gWidth": 200,
"color": "#ffffff",
"background": "#03a9f4",
"resized": false,
"autoExpand": true,
"backgroundOpacity": "10",
"collapsed": false
} }
}, },
"scroll": { "scroll": {