from datetime import datetime from polyglot.detect import Detector from pyrogram import filters from pyrogram.client import Client from pyrogram.enums import MessageEntityType, PollType from pyrogram.types import Message from app import app from modules import custom_filters from modules.database import col_analytics_group from modules.logging import logWrite from modules.utils import configGet @app.on_message( custom_filters.enabled_general & ~filters.scheduled & filters.chat(configGet("users", "groups")) ) async def msg_destination_group(app: Client, msg: Message): analytics_entry = { "id": msg.id, "user": msg.from_user.id, "date": datetime.now(), "reply": { "id": msg.reply_to_message_id, "top_id": msg.reply_to_top_message_id, "user": None if msg.reply_to_message is None else msg.reply_to_message.from_user.id, }, "forward": { "id": msg.forward_from_message_id, "chat": None if msg.forward_from_chat is None else msg.forward_from_chat.id, "user": None if msg.forward_from is None else msg.forward_from.id, "date": msg.forward_date, }, "media_spoilered": msg.has_media_spoiler, "entities": {"links": [], "mentions": []}, "text": None, "language": None, "language_confidence": None, "animation": None, "audio": None, "contact": None, "document": None, "location": None, "photo": None, "poll": None, "sticker": None, "venue": None, "video": None, "videonote": None, "voice": None, } if msg.text is not None or msg.caption is not None: text = msg.text if msg.text is not None else msg.caption analytics_entry["text"] = text if msg.entities is not None or msg.caption_entities is not None: entities = ( msg.entities if msg.entities is not None else msg.caption_entities ) for entity in entities: if entity.type == MessageEntityType.TEXT_LINK: analytics_entry["entities"]["links"].append(entity.url) elif entity.type == MessageEntityType.URL: analytics_entry["entities"]["links"].append( text[entity.offset : entity.offset + entity.length] ) elif entity.type == MessageEntityType.TEXT_MENTION: analytics_entry["entities"]["mentions"].append(entity.user.id) elif entity.type == MessageEntityType.MENTION: analytics_entry["entities"]["mentions"].append( text[entity.offset : entity.offset + entity.length] ) lang = Detector(text, quiet=True).language analytics_entry["language"] = lang.code analytics_entry["language_confidence"] = lang.confidence if lang.code == "ru": logWrite( f"Message '{text}' from {msg.from_user.first_name} ({msg.from_user.id}) is fucking russian [confidence {lang.confidence}]" ) if msg.animation is not None: analytics_entry["animation"] = { "id": msg.animation.file_id, "duration": msg.animation.duration, "height": msg.animation.height, "width": msg.animation.width, "file_name": msg.animation.file_name, "mime_type": msg.animation.mime_type, } if msg.audio is not None: analytics_entry["audio"] = { "id": msg.audio.file_id, "title": msg.audio.title, "performer": msg.audio.performer, "duration": msg.audio.duration, "file_name": msg.audio.file_name, "file_size": msg.audio.file_size, "mime_type": msg.audio.mime_type, } if msg.contact is not None: analytics_entry["contact"] = { "id": msg.contact.user_id, "first_name": msg.contact.first_name, "last_name": msg.contact.last_name, "phone_number": msg.contact.phone_number, "vcard": msg.contact.vcard, } if msg.document is not None: analytics_entry["document"] = { "id": msg.document.file_id, "file_name": msg.document.file_name, "file_size": msg.document.file_size, "mime_type": msg.document.mime_type, } if msg.location is not None: analytics_entry["location"] = { "longitude": msg.location.longitude, "latitude": msg.location.latitude, } if msg.photo is not None: thumbnails = [] for thumbail in msg.photo.thumbs: thumbnails.append( { "id": thumbail.file_id, "height": thumbail.height, "width": thumbail.width, "file_size": thumbail.file_size, } ) analytics_entry["photo"] = { "id": msg.photo.file_id, "height": msg.photo.height, "width": msg.photo.width, "file_size": msg.photo.file_size, "thumbnails": thumbnails, } if msg.poll is not None: options = [] for option in msg.poll.options: options.append(option.text) analytics_entry["poll"] = { "id": msg.poll.id, "question": msg.poll.question, "open_period": msg.poll.open_period, "close_date": msg.poll.close_date, "options": options, "correct_option": msg.poll.correct_option_id, "explanation": msg.poll.explanation, "anonymous": msg.poll.is_anonymous, "multiple_answers": msg.poll.allows_multiple_answers, "quiz": True if msg.poll.type == PollType.QUIZ else False, } if msg.sticker is not None: thumbnails = [] for thumbail in msg.sticker.thumbs: thumbnails.append( { "id": thumbail.file_id, "height": thumbail.height, "width": thumbail.width, "file_size": thumbail.file_size, } ) analytics_entry["sticker"] = { "id": msg.sticker.file_id, "emoji": msg.sticker.emoji, "set_name": msg.sticker.set_name, "animated": msg.sticker.is_animated, "video": msg.sticker.is_video, "height": msg.sticker.height, "width": msg.sticker.width, "file_name": msg.sticker.file_name, "file_size": msg.sticker.file_size, "mime_type": msg.sticker.mime_type, "thumbnails": thumbnails, } if msg.venue is not None: analytics_entry["venue"] = { "title": msg.venue.title, "address": msg.venue.address, "longitude": msg.venue.location.longitude, "latitude": msg.venue.location.latitude, "foursquare_id": msg.venue.foursquare_id, "foursquare_type": msg.venue.foursquare_type, } if msg.video is not None: thumbnails = [] for thumbail in msg.video.thumbs: thumbnails.append( { "id": thumbail.file_id, "height": thumbail.height, "width": thumbail.width, "file_size": thumbail.file_size, } ) analytics_entry["video"] = { "id": msg.video.file_id, "duration": msg.video.duration, "height": msg.video.height, "width": msg.video.width, "file_name": msg.video.file_name, "file_size": msg.video.file_size, "mime_type": msg.video.mime_type, "thumbnails": thumbnails, } if msg.video_note is not None: thumbnails = [] for thumbail in msg.video_note.thumbs: thumbnails.append( { "id": thumbail.file_id, "height": thumbail.height, "width": thumbail.width, "file_size": thumbail.file_size, } ) analytics_entry["videonote"] = { "id": msg.video_note.file_id, "duration": msg.video_note.duration, "length": msg.video_note.length, "file_size": msg.video_note.file_size, "mime_type": msg.video_note.mime_type, "thumbnails": thumbnails, } if msg.voice is not None: analytics_entry["voice"] = { "id": msg.voice.file_id, "duration": msg.voice.duration, "file_size": msg.voice.file_size, "mime_type": msg.voice.mime_type, } col_analytics_group.insert_one(analytics_entry)