import contextlib import logging from datetime import datetime, timedelta from apscheduler.jobstores.base import JobLookupError from pykeyboard import InlineButton, InlineKeyboard from pyrogram import filters from pyrogram.types import CallbackQuery from classes.callbacks import CallbackVerify from classes.pyroclient import PyroClient from classes.pyrogroup import PyroGroup from modules.database import col_schedule from modules.kicker import kick_unverified from modules.utils import get_captcha_image logger = logging.getLogger(__name__) @PyroClient.on_callback_query(filters.regex(r"verify:[\s\S]*")) # type: ignore async def callback_verify(app: PyroClient, callback: CallbackQuery): parsed = CallbackVerify.from_callback(callback) group = await PyroGroup.create_if_not_exists(callback.message.chat.id, None, True) locale = group.select_locale(app, callback.message.from_user) if callback.from_user.id != parsed.user_id: await callback.answer( app._("wrong_user", "callbacks", locale=locale), show_alert=True ) return user = await app.find_user(callback.from_user, group.id) captcha = get_captcha_image(app.config["emojis"]) logger.info( "Captcha for %s has been generated. All: %s, Correct: %s", user.id, captcha.emojis_all, captcha.emojis_correct, ) scheduled_job = await col_schedule.find_one_and_delete( {"user": parsed.user_id, "group": group.id} ) if scheduled_job is not None and app.scheduler is not None: with contextlib.suppress(JobLookupError): app.scheduler.remove_job(scheduled_job["job_id"]) await user.set_emojis(captcha.emojis_correct) buttons = [ InlineButton(emoji, f"emoji:{user.id}:{emoji}") for emoji in captcha.emojis_all ] keyboard = InlineKeyboard(3) keyboard.add(*buttons) await callback.message.delete() captcha_message = await app.send_photo( group.id, captcha.image, caption=app._("verify", "messages", locale=locale), reply_markup=keyboard, ) del captcha if app.scheduler is not None: app.scheduler.add_job( kick_unverified, "date", [app, user.id, group.id, captcha_message.id], run_date=datetime.now() + timedelta(seconds=group.timeout_verify), )