/nearby, subscriptions check, geocoding #2
5
classes/errors/geo.py
Normal file
5
classes/errors/geo.py
Normal file
@ -0,0 +1,5 @@
|
||||
class PlaceNotFoundError(Exception):
|
||||
"""Query provided did not lead to any city or populated area"""
|
||||
def __init__(self, query):
|
||||
self.query = query
|
||||
super().__init__(f"Could not find any place on geonames.org of feature classes A and P by query '{self.query}'")
|
@ -8,9 +8,10 @@ from typing import Any, List, Literal, Union
|
||||
from pyrogram.types import User, ChatMember, ChatPrivileges, Chat, Message, Photo, Video, Document, Animation, Voice, ForceReply, ReplyKeyboardMarkup
|
||||
from pyrogram.errors import bad_request_400
|
||||
from dateutil.relativedelta import relativedelta
|
||||
from classes.errors.geo import PlaceNotFoundError
|
||||
from modules.database import col_tmp, col_users, col_context, col_warnings, col_applications, col_sponsorships, col_messages
|
||||
from modules.logging import logWrite
|
||||
from modules.utils import configGet, locale, should_quote
|
||||
from modules.utils import configGet, find_location, locale, should_quote
|
||||
|
||||
class DefaultApplicationTemp(dict):
|
||||
def __init__(self, user: int, reapply: bool = False):
|
||||
@ -350,6 +351,10 @@ class HoloUser():
|
||||
)
|
||||
|
||||
progress = col_tmp.find_one({"user": self.id, "type": "application"})
|
||||
|
||||
if progress is None:
|
||||
return
|
||||
|
||||
stage = progress["stage"]
|
||||
|
||||
if self.sponsorship_state()[0] == "fill":
|
||||
@ -385,13 +390,16 @@ class HoloUser():
|
||||
|
||||
elif stage == 3:
|
||||
try:
|
||||
result = (get(f"http://api.geonames.org/searchJSON?q={query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json()
|
||||
progress["application"][str(stage)] = result["geonames"][0]
|
||||
progress["application"][str(stage)] = find_location(query)
|
||||
if ("lat" in progress["application"][str(stage)] and "lng" in progress["application"][str(stage)]):
|
||||
progress["application"][str(stage)]["loc"] = [progress["application"][str(stage)]["lng"], progress["application"][str(stage)]["lat"]]
|
||||
del progress["application"][str(stage)]["lat"]
|
||||
del progress["application"][str(stage)]["lng"]
|
||||
col_tmp.update_one({"user": {"$eq": self.id}, "type": {"$eq": "application"}}, {"$set": {"application": progress["application"], "stage": progress["stage"]+1}})
|
||||
await msg.reply_text(locale(f"question3_found", "message", locale=self.locale).format(result["geonames"][0]["name"], result["geonames"][0]["adminName1"]))
|
||||
await msg.reply_text(locale("question3_found", "message", locale=self.locale).format(progress["application"][str(stage)]["name"], progress["application"][str(stage)]["adminName1"]))
|
||||
await msg.reply_text(locale(f"question{stage+1}", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage+1}", "force_reply", locale=self.locale))))
|
||||
except (ValueError, KeyError, IndexError):
|
||||
await msg.reply_text(locale(f"question3_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
|
||||
except PlaceNotFoundError:
|
||||
await msg.reply_text(locale("question3_invalid", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
|
||||
return
|
||||
except Exception as exp:
|
||||
await msg.reply_text(locale("question3_error", "message", locale=self.locale), reply_markup=ForceReply(placeholder=str(locale(f"question{stage}", "force_reply", locale=self.locale))))
|
||||
@ -470,6 +478,10 @@ class HoloUser():
|
||||
if col_tmp.find_one({"user": self.id, "type": "sponsorship"}) is not None:
|
||||
|
||||
progress = col_tmp.find_one({"user": self.id, "type": "sponsorship"})
|
||||
|
||||
if progress is None:
|
||||
return
|
||||
|
||||
stage = progress["stage"]
|
||||
|
||||
if progress["state"] == "fill" and progress["sent"] is False:
|
||||
|
@ -92,6 +92,8 @@
|
||||
"label_too_long": "Довжина назви ролі не повинна перевищувати 16 символів",
|
||||
"finish_sponsorship": "❌ **Дія неможлива**\nПерш ніж заповнювати анкету, треба завершити заповнення форми спонсора.",
|
||||
"finish_application": "❌ **Дія неможлива**\nПерш ніж заповнювати форму спонсора, треба завершити заповнення анкети.",
|
||||
"nearby_invalid": "",
|
||||
"nearby_error": "",
|
||||
"voice_message": [
|
||||
"why are u gae",
|
||||
"руки відірвало? пиши як людина",
|
||||
|
@ -4,25 +4,35 @@ from pyrogram.types import Message
|
||||
from pyrogram.client import Client
|
||||
from classes.holo_user import HoloUser
|
||||
from modules import custom_filters
|
||||
from modules.utils import configGet, should_quote
|
||||
from modules.utils import configGet, locale, should_quote, find_location
|
||||
from modules.database import col_applications
|
||||
from classes.errors.geo import PlaceNotFoundError
|
||||
|
||||
# Nearby command ===============================================================================================================
|
||||
@app.on_message(~ filters.scheduled & (custom_filters.allowed | custom_filters.admin) & (filters.private | (filters.chat(configGet("admin_group")) | filters.chat(configGet("destination_group")))) & filters.command(["nearby"], prefixes=["/"]))
|
||||
async def cmd_nearby(app: Client, msg: Message):
|
||||
|
||||
if len(msg.command) < 1:
|
||||
holo_user = HoloUser(msg.from_user)
|
||||
|
||||
if len(msg.command) < 2:
|
||||
application = col_applications.find_one({"user": msg.from_user})
|
||||
if application is None:
|
||||
await msg.reply_text("You have no application")
|
||||
await msg.reply_text(locale("nearby_user_empty", "message", locale=holo_user.locale))
|
||||
return
|
||||
location = application["application"]["3"]["loc"][0], application["application"]["3"]["loc"][1]
|
||||
else:
|
||||
# find location
|
||||
location = "result"
|
||||
try:
|
||||
location_coordinates = find_location(" ".join(msg.command[2:]))
|
||||
location = location_coordinates["lng"], location_coordinates["lat"]
|
||||
except PlaceNotFoundError:
|
||||
await msg.reply_text(locale("nearby_invalid", "message", locale=holo_user.locale), quote=should_quote(msg))
|
||||
return
|
||||
except Exception as exp:
|
||||
await msg.reply_text(locale("nearby_error", "message", locale=holo_user.locale), quote=should_quote(msg))
|
||||
return
|
||||
|
||||
output = []
|
||||
users_nearby = col_applications.find( {"application": {"loc": { "$within": { "$center": [[location], 5] } }}} )
|
||||
users_nearby = col_applications.find( {"application.loc": {"$near": { "$geometry": { "type": "Point", "coordinates": location }, "$maxDistance": 30000 }} } )
|
||||
|
||||
for user in users_nearby:
|
||||
output.append(user)
|
||||
|
@ -1,4 +1,4 @@
|
||||
from pymongo import MongoClient
|
||||
from pymongo import MongoClient, GEO2D
|
||||
from ujson import loads
|
||||
|
||||
with open("config.json", "r", encoding="utf-8") as f:
|
||||
@ -35,4 +35,6 @@ col_context = db.get_collection("context")
|
||||
col_messages = db.get_collection("messages")
|
||||
col_warnings = db.get_collection("warnings")
|
||||
col_applications = db.get_collection("applications")
|
||||
col_sponsorships = db.get_collection("sponsorships")
|
||||
col_sponsorships = db.get_collection("sponsorships")
|
||||
|
||||
col_applications.create_index([("application.3.loc", GEO2D)])
|
@ -1,4 +1,5 @@
|
||||
from typing import Any, Union
|
||||
from requests import get
|
||||
from pyrogram.enums.chat_type import ChatType
|
||||
from pyrogram.types import User
|
||||
from pyrogram.client import Client
|
||||
@ -10,6 +11,7 @@ from sys import exit
|
||||
from os import kill, listdir, sep
|
||||
from os import name as osname
|
||||
from traceback import print_exc
|
||||
from classes.errors.geo import PlaceNotFoundError
|
||||
|
||||
from modules.logging import logWrite
|
||||
|
||||
@ -171,6 +173,24 @@ def all_locales(key: str, *args: str) -> list:
|
||||
|
||||
return output
|
||||
|
||||
def find_location(query: str) -> dict:
|
||||
"""Find location on geonames.org by query. Search is made with feature classes A and P.
|
||||
|
||||
### Args:
|
||||
* query (`str`): Some city/village/state name
|
||||
|
||||
### Raises:
|
||||
* PlaceNotFoundError: Exception is raised when API result is empty
|
||||
|
||||
### Returns:
|
||||
* `dict`: One instance of geonames response
|
||||
"""
|
||||
try:
|
||||
result = (get(f"http://api.geonames.org/searchJSON?q={query}&maxRows=1&countryBias=UA&lang=uk&orderby=relevance&featureClass=P&featureClass=A&username={configGet('username', 'geocoding')}")).json()
|
||||
return result["geonames"][0]
|
||||
except (ValueError, KeyError, IndexError):
|
||||
raise PlaceNotFoundError(query)
|
||||
|
||||
try:
|
||||
from psutil import Process
|
||||
except ModuleNotFoundError:
|
||||
|
Reference in New Issue
Block a user