This repository has been archived on 2024-05-31. You can view files and clone it, but cannot push or open issues or pull requests.
Telegram/modules/inline.py

310 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Module responsible for providing answers to
all inline queries that bot receives"""
from datetime import datetime
from os import path, sep
from app import app, isAnAdmin
from pyrogram.types import (
InlineQueryResultArticle,
InputTextMessageContent,
InlineQuery,
InlineKeyboardMarkup,
InlineKeyboardButton,
)
from pyrogram.client import Client
from pyrogram.enums.chat_type import ChatType
from pyrogram.enums.chat_members_filter import ChatMembersFilter
from dateutil.relativedelta import relativedelta
from classes.errors.holo_user import UserNotFoundError, UserInvalidError
from classes.holo_user import HoloUser
from modules.logging import logWrite
from modules.utils import configGet, jsonLoad, locale
from modules.database import col_applications, col_spoilers
from bson.objectid import ObjectId
from bson.errors import InvalidId
@app.on_inline_query()
async def inline_answer(client: Client, inline_query: InlineQuery):
results = []
if configGet("allow_external", "features", "spoilers") is True:
if inline_query.query.startswith("spoiler:"):
try:
spoil = col_spoilers.find_one(
{"_id": ObjectId(inline_query.query.removeprefix("spoiler:"))}
)
if spoil is not None:
desc = locale(
"spoiler_described",
"message",
locale=inline_query.from_user,
).format(
locale(spoil["category"], "message", "spoiler_categories"),
spoil["description"],
)
results = [
InlineQueryResultArticle(
title=locale(
"title",
"inline",
"spoiler",
locale=inline_query.from_user,
),
description=locale(
"description",
"inline",
"spoiler",
locale=inline_query.from_user,
),
input_message_content=InputTextMessageContent(
desc, disable_web_page_preview=True
),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
locale(
"spoiler_view",
"button",
locale=inline_query.from_user,
),
callback_data=f'sid_{inline_query.query.removeprefix("spoiler:")}',
)
]
]
),
)
]
except InvalidId:
results = []
await inline_query.answer(results=results)
return
if inline_query.chat_type in [ChatType.CHANNEL]:
await inline_query.answer(
results=[
InlineQueryResultArticle(
title=locale(
"title", "inline", "not_pm", locale=inline_query.from_user
),
input_message_content=InputTextMessageContent(
locale(
"message_content",
"inline",
"not_pm",
locale=inline_query.from_user,
)
),
description=locale(
"description", "inline", "not_pm", locale=inline_query.from_user
),
)
]
)
return
results_forbidden = [
InlineQueryResultArticle(
title=locale("title", "inline", "forbidden", locale=inline_query.from_user),
input_message_content=InputTextMessageContent(
locale(
"message_content",
"inline",
"forbidden",
locale=inline_query.from_user,
)
),
description=locale(
"description", "inline", "forbidden", locale=inline_query.from_user
),
)
]
try:
holo_user = HoloUser(inline_query.from_user)
except (UserNotFoundError, UserInvalidError):
logWrite(
f"Could not find application of {inline_query.from_user.id}, ignoring inline query",
debug=True,
)
await inline_query.answer(results=results_forbidden)
return
if path.exists(path.join(configGet("cache", "locations"), "group_members")) and (
inline_query.from_user.id
not in jsonLoad(path.join(configGet("cache", "locations"), "group_members"))
):
if path.exists(path.join(configGet("cache", "locations"), "admins")) and (
inline_query.from_user.id
not in jsonLoad(path.join(configGet("cache", "locations"), "admins"))
):
logWrite(
f"{inline_query.from_user.id} is not an admin and not in members group, ignoring inline query",
debug=True,
)
await inline_query.answer(results=results_forbidden)
return
if holo_user.application_approved() or (await isAnAdmin(holo_user.id) is True):
max_results = (
configGet("inline_preview_count") if inline_query.query != "" else 200
)
list_of_users = []
async for m in app.get_chat_members(
configGet("users", "groups"),
limit=max_results,
filter=ChatMembersFilter.SEARCH,
query=inline_query.query,
):
list_of_users.append(m)
for match in list_of_users:
application = col_applications.find_one({"user": match.user.id})
if application is None:
continue
application_content = []
i = 1
for question in application["application"]:
if i == 2:
age = relativedelta(datetime.now(), application["application"]["2"])
application_content.append(
f"{locale(f'question{i}', 'message', 'question_titles', locale=inline_query.from_user)} {application['application']['2'].strftime('%d.%m.%Y')} ({age.years} р.)"
)
elif i == 3:
if application["application"]["3"]["countryCode"] == "UA":
application_content.append(
f"{locale(f'question{i}', 'message', 'question_titles', locale=inline_query.from_user)} {application['application']['3']['name']}"
)
else:
application_content.append(
f"{locale(f'question{i}', 'message', 'question_titles', locale=inline_query.from_user)} {application['application']['3']['name']} ({application['application']['3']['adminName1']}, {application['application']['3']['countryName']})"
)
else:
application_content.append(
f"{locale(f'question{i}', 'message', 'question_titles', locale=inline_query.from_user)} {application['application'][question]}"
)
i += 1
if match.user.photo != None:
try:
if not path.exists(
f'{configGet("cache", "locations")}{sep}avatars{sep}{match.user.photo.big_file_id}'
):
print(
f'Downloaded avatar {match.user.photo.big_file_id} of {match.user.id} and uploaded to {configGet("api")}/avatars/{match.user.photo.big_file_id}',
flush=True,
)
await app.download_media(
match.user.photo.big_file_id,
file_name=f'{configGet("cache", "locations")}{sep}avatars{sep}{match.user.photo.big_file_id}',
)
results.append(
InlineQueryResultArticle(
title=str(match.user.first_name),
input_message_content=InputTextMessageContent(
locale(
"message_content",
"inline",
"user",
locale=inline_query.from_user,
).format(
match.user.first_name,
match.user.username,
"\n".join(application_content),
)
),
description=locale(
"description",
"inline",
"user",
locale=inline_query.from_user,
).format(match.user.first_name, match.user.username),
thumb_url=f'{configGet("api")}/avatars/{match.user.photo.big_file_id}',
)
)
except ValueError:
results.append(
InlineQueryResultArticle(
title=str(match.user.first_name),
input_message_content=InputTextMessageContent(
locale(
"message_content",
"inline",
"user",
locale=inline_query.from_user,
).format(
match.user.first_name,
match.user.username,
"\n".join(application_content),
)
),
description=locale(
"description",
"inline",
"user",
locale=inline_query.from_user,
).format(match.user.first_name, match.user.username),
)
)
except FileNotFoundError:
results.append(
InlineQueryResultArticle(
title=str(match.user.first_name),
input_message_content=InputTextMessageContent(
locale(
"message_content",
"inline",
"user",
locale=inline_query.from_user,
).format(
match.user.first_name,
match.user.username,
"\n".join(application_content),
)
),
description=locale(
"description",
"inline",
"user",
locale=inline_query.from_user,
).format(match.user.first_name, match.user.username),
)
)
else:
results.append(
InlineQueryResultArticle(
title=str(match.user.first_name),
input_message_content=InputTextMessageContent(
locale(
"message_content",
"inline",
"user",
locale=inline_query.from_user,
).format(
match.user.first_name,
match.user.username,
"\n".join(application_content),
)
),
description=locale(
"description",
"inline",
"user",
locale=inline_query.from_user,
).format(match.user.first_name, match.user.username),
)
)
await inline_query.answer(results=results, cache_time=10, is_personal=True)