WIP: analytics

This commit is contained in:
Profitroll 2023-04-06 16:08:33 +02:00
parent 2e7d4aa263
commit e8541b5160
4 changed files with 252 additions and 46 deletions

View File

@ -39,10 +39,10 @@ from modules.callbacks.sub import *
from modules.callbacks.sus import *
from modules.callbacks.warnings import *
from modules.handlers.analytics_group import *
from modules.handlers.confirmation import *
from modules.handlers.contact import *
from modules.handlers.group_member_update import *
from modules.handlers.group_message import *
from modules.handlers.voice import *
from modules.handlers.welcome import *
from modules.handlers.everything import *

View File

@ -37,6 +37,8 @@ for collection in [
"warnings",
"applications",
"sponsorships",
"analytics_group",
"analytics_users"
]:
if not collection in collections:
db.create_collection(collection)
@ -51,5 +53,7 @@ col_messages = db.get_collection("messages")
col_warnings = db.get_collection("warnings")
col_applications = db.get_collection("applications")
col_sponsorships = db.get_collection("sponsorships")
col_analytics_group = db.get_collection("analytics_group")
col_analytics_users = db.get_collection("analytics_users")
col_applications.create_index([("application.3.location", GEOSPHERE)])

View File

@ -0,0 +1,247 @@
from datetime import datetime
from polyglot.detect import Detector
from pyrogram import filters
from pyrogram.client import Client
from pyrogram.enums import MessageEntityType, PollType
from pyrogram.types import Message
from app import app
from modules import custom_filters
from modules.database import col_analytics_group
from modules.logging import logWrite
from modules.utils import configGet
@app.on_message(
custom_filters.enabled_general
& ~filters.scheduled
& filters.chat(configGet("users", "groups"))
)
async def msg_destination_group(app: Client, msg: Message):
analytics_entry = {
"id": msg.id,
"user": msg.from_user.id,
"date": datetime.now(),
"reply": {
"id": msg.reply_to_message_id,
"top_id": msg.reply_to_top_message_id,
"user": None
if msg.reply_to_message is None
else msg.reply_to_message.from_user.id,
},
"forward": {
"id": msg.forward_from_message_id,
"chat": None if msg.forward_from_chat is None else msg.forward_from_chat.id,
"user": None if msg.forward_from is None else msg.forward_from.id,
"date": msg.forward_date,
},
"media_spoilered": msg.has_media_spoiler,
"entities": {"links": [], "mentions": []},
"text": None,
"language": None,
"language_confidence": None,
"animation": None,
"audio": None,
"contact": None,
"document": None,
"location": None,
"photo": None,
"poll": None,
"sticker": None,
"venue": None,
"video": None,
"videonote": None,
"voice": None,
}
if msg.text is not None or msg.caption is not None:
text = msg.text if msg.text is not None else msg.caption
analytics_entry["text"] = text
if msg.entities is not None or msg.caption_entities is not None:
entities = (
msg.entities if msg.entities is not None else msg.caption_entities
)
for entity in entities:
if entity.type == MessageEntityType.TEXT_LINK:
analytics_entry["entities"]["links"].append(entity.url)
elif entity.type == MessageEntityType.TEXT_MENTION:
analytics_entry["entities"]["mentions"].append(entity.user.id)
lang = Detector(text, quiet=True).language
analytics_entry["language"] = lang.code
analytics_entry["language_confidence"] = lang.confidence
if lang.code == "ru":
logWrite(
f"Message '{text}' from {msg.from_user.first_name} ({msg.from_user.id}) is fucking russian [confidence {lang.confidence}]"
)
if msg.animation is not None:
analytics_entry["animation"] = {
"id": msg.animation.file_id,
"duration": msg.animation.duration,
"height": msg.animation.height,
"width": msg.animation.width,
"file_name": msg.animation.file_name,
"mime_type": msg.animation.mime_type,
}
if msg.audio is not None:
analytics_entry["audio"] = {
"id": msg.audio.file_id,
"title": msg.audio.title,
"performer": msg.audio.performer,
"duration": msg.audio.duration,
"file_name": msg.audio.file_name,
"file_size": msg.audio.file_size,
"mime_type": msg.audio.mime_type,
}
if msg.contact is not None:
analytics_entry["contact"] = {
"id": msg.contact.user_id,
"first_name": msg.contact.first_name,
"last_name": msg.contact.last_name,
"phone_number": msg.contact.phone_number,
"vcard": msg.contact.vcard,
}
if msg.document is not None:
analytics_entry["document"] = {
"id": msg.document.file_id,
"file_name": msg.document.file_name,
"file_size": msg.document.file_size,
"mime_type": msg.document.mime_type,
}
if msg.location is not None:
analytics_entry["location"] = {
"longitude": msg.location.longitude,
"latitude": msg.location.latitude,
}
if msg.photo is not None:
thumbnails = []
for thumbail in msg.photo.thumbs:
thumbnails.append(
{
"id": thumbail.file_id,
"height": thumbail.height,
"width": thumbail.width,
"file_size": thumbail.file_size,
}
)
analytics_entry["photo"] = {
"id": msg.photo.file_id,
"height": msg.photo.height,
"width": msg.photo.width,
"file_size": msg.photo.file_size,
"thumbnails": thumbnails,
}
if msg.poll is not None:
options = []
for option in msg.poll.options:
options.append(option.text)
analytics_entry["poll"] = {
"id": msg.poll.id,
"question": msg.poll.question,
"open_period": msg.poll.open_period,
"close_date": msg.poll.close_date,
"options": options,
"correct_option": msg.poll.correct_option_id,
"explanation": msg.poll.explanation,
"anonymous": msg.poll.is_anonymous,
"multiple_answers": msg.poll.allows_multiple_answers,
"quiz": True if msg.poll.type == PollType.QUIZ else False,
}
if msg.sticker is not None:
thumbnails = []
for thumbail in msg.sticker.thumbs:
thumbnails.append(
{
"id": thumbail.file_id,
"height": thumbail.height,
"width": thumbail.width,
"file_size": thumbail.file_size,
}
)
analytics_entry["sticker"] = {
"id": msg.sticker.file_id,
"emoji": msg.sticker.emoji,
"set_name": msg.sticker.set_name,
"animated": msg.sticker.is_animated,
"video": msg.sticker.is_video,
"height": msg.sticker.height,
"width": msg.sticker.width,
"file_name": msg.sticker.file_name,
"file_size": msg.sticker.file_size,
"mime_type": msg.sticker.mime_type,
"thumbnails": thumbnails,
}
if msg.venue is not None:
analytics_entry["venue"] = {
"title": msg.venue.title,
"address": msg.venue.address,
"longitude": msg.venue.location.longitude,
"latitude": msg.venue.location.latitude,
"foursquare_id": msg.venue.foursquare_id,
"foursquare_type": msg.venue.foursquare_type,
}
if msg.video is not None:
thumbnails = []
for thumbail in msg.video.thumbs:
thumbnails.append(
{
"id": thumbail.file_id,
"height": thumbail.height,
"width": thumbail.width,
"file_size": thumbail.file_size,
}
)
analytics_entry["video"] = {
"id": msg.video.file_id,
"duration": msg.video.duration,
"height": msg.video.height,
"width": msg.video.width,
"file_name": msg.video.file_name,
"file_size": msg.video.file_size,
"mime_type": msg.video.mime_type,
"thumbnails": thumbnails,
}
if msg.video_note is not None:
thumbnails = []
for thumbail in msg.video_note.thumbs:
thumbnails.append(
{
"id": thumbail.file_id,
"height": thumbail.height,
"width": thumbail.width,
"file_size": thumbail.file_size,
}
)
analytics_entry["video_note"] = {
"id": msg.video_note.file_id,
"duration": msg.video_note.duration,
"length": msg.video_note.length,
"file_size": msg.video_note.file_size,
"mime_type": msg.video_note.mime_type,
"thumbnails": thumbnails,
}
if msg.voice is not None:
analytics_entry["voice"] = {
"id": msg.voice.file_id,
"duration": msg.voice.duration,
"file_size": msg.voice.file_size,
"mime_type": msg.voice.mime_type,
}
col_analytics_group.insert_one(analytics_entry)

View File

@ -1,45 +0,0 @@
from datetime import datetime
from app import app
from pyrogram import filters
from pyrogram.types import Message
from pyrogram.client import Client
from modules.logging import logWrite
from modules.utils import configGet, locale
from modules.database import col_warnings
from modules import custom_filters
from polyglot.detect import Detector
@app.on_message(
custom_filters.enabled_general
& ~filters.scheduled
& filters.chat(configGet("users", "groups"))
)
async def msg_destination_group(app: Client, msg: Message):
if msg.text is not None:
lang = Detector(msg.text, quiet=True).language
if lang.code == "ru":
logWrite(
f"Message '{msg.text}' from {msg.from_user.first_name} ({msg.from_user.id}) is fucking russian!!! [confidence {lang.confidence}]"
)
else:
logWrite(
f"Message '{msg.text}' from {msg.from_user.first_name} ({msg.from_user.id}) is written {lang.code} [confidence {lang.confidence}]"
)
return
elif msg.caption is not None:
lang = Detector(msg.caption, quiet=True).language
if lang.code == "ru":
logWrite(
f"Message '{msg.caption}' from {msg.from_user.first_name} ({msg.from_user.id}) is fucking russian!!! [confidence {lang.confidence}]"
)
else:
logWrite(
f"Message '{msg.caption}' from {msg.from_user.first_name} ({msg.from_user.id}) is written {lang.code} [confidence {lang.confidence}]"
)
return
else:
logWrite(
f"Message from {msg.from_user.first_name} ({msg.from_user.id}) has no text in it."
)
return