Initial commit

This commit is contained in:
2023-08-27 22:43:16 +02:00
commit 502ed0406e
40 changed files with 1854 additions and 0 deletions

View File

@@ -0,0 +1,106 @@
import logging
from io import BytesIO
from convopyro import listen_message
from pykeyboard import ReplyButton, ReplyKeyboard
from pyrogram import filters
from pyrogram.types import Message, ReplyKeyboardRemove
from ujson import dumps
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["checkout"], prefixes=["/"]) # type: ignore
)
async def command_checkout(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
user_data = BytesIO(
dumps(await user.checkout(), escape_forward_slashes=False).encode()
)
await message.reply_document(
user_data,
file_name="user_data.json",
)
# Data deletion request
keyboard_delete = ReplyKeyboard(
row_width=1, resize_keyboard=True, one_time_keyboard=True
)
keyboard_delete.add(
ReplyButton("Yes, I want to delete it"),
ReplyButton("No, I don't want to delete it"),
)
await message.reply_text(
"Here's pretty much all the data bot has. Please, use these buttons to choose whether you want to delete your data from the bot.",
reply_markup=keyboard_delete,
)
while True:
answer_delete = await listen_message(app, message.chat.id, 300)
if answer_delete is None or answer_delete.text == "/cancel":
await message.reply_text(
"Cancelled.",
reply_markup=ReplyKeyboardRemove(),
)
return
if answer_delete.text not in [
"Yes, I want to delete it",
"No, I don't want to delete it",
]:
await answer_delete.reply_text(
"Invalid answer provided. Use /cancel if you want to cancel this operation."
)
continue
if answer_delete.text in [
"No, I don't want to delete it",
]:
await answer_delete.reply_text(
"Alright, cancelled.", reply_markup=ReplyKeyboardRemove()
)
return
break
# Confirmation
keyboard_confirm = ReplyKeyboard(
row_width=1, resize_keyboard=True, one_time_keyboard=True
)
keyboard_confirm.add(ReplyButton("I agree and want to proceed"))
await message.reply_text(
"Alright. Please, confirm that you want to delete your data from the bot.\n\nFollowing data will be deleted:\nSelected location, preferred language of the messages, notifications time and your notifications offset.",
reply_markup=keyboard_confirm,
)
while True:
answer_confirm = await listen_message(app, message.chat.id, 300)
if answer_confirm is None or answer_confirm.text == "/cancel":
await message.reply_text(
"Cancelled.",
reply_markup=ReplyKeyboardRemove(),
)
return
if answer_confirm.text not in ["I agree and want to proceed"]:
await answer_confirm.reply_text(
"Invalid answer provided. Use /cancel if you want to cancel this operation."
)
continue
break
await user.delete()
await answer_confirm.reply_text(
"Your data has been deleted. If you want to start using this bot again, please use /setup command. Otherwise delete/block the bot and do not interact with it anymore.",
reply_markup=ReplyKeyboardRemove(),
)

13
plugins/commands/help.py Normal file
View File

@@ -0,0 +1,13 @@
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["help"], prefixes=["/"]) # type: ignore
)
async def command_help(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(app._("help", "messages", locale=user.locale))

View File

@@ -0,0 +1,74 @@
from datetime import datetime
from typing import List, Mapping, Union
from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import Message
from ujson import loads
from classes.pyroclient import PyroClient
from modules import custom_filters
from modules.database import col_entries
@PyroClient.on_message(
~filters.scheduled & filters.private & custom_filters.owner & filters.command(["import"], prefixes=["/"]) # type: ignore
)
async def command_import(app: PyroClient, message: Message):
await message.reply_text("Alright. Send me a valid JSON.")
while True:
answer = await listen_message(app, message.chat.id, 300)
if answer is None or answer.text == "/cancel":
await message.reply_text("Cancelled.")
return
if answer.document is None or answer.document.mime_type != "application/json":
await answer.reply_text(
"Invalid input. Please, send me a JSON file with entries."
)
continue
break
file = await app.download_media(answer, in_memory=True)
entries: List[Mapping[str, Union[str, int]]] = loads(bytes(file.getbuffer())) # type: ignore
for entry in entries:
if not isinstance(entries, list):
await answer.reply_text("This is not a valid garbage collection JSON.")
return
for key in ("locations", "garbage_type", "date"):
if (
key not in entry
or (key == "garbage_type" and not isinstance(entry[key], int))
or (key == "locations" and not isinstance(entry[key], list))
):
await answer.reply_text("This is not a valid garbage collection JSON.")
return
if key == "date":
try:
datetime.fromisoformat(str(entry[key]))
except (ValueError, TypeError):
await answer.reply_text(
"Entries contain invalid date formats. Use **ISO 8601** date format."
)
return
entries_clean: List[Mapping[str, Union[str, int, datetime]]] = [
{
"locations": entry["locations"],
"garbage_type": entry["garbage_type"],
"date": datetime.fromisoformat(str(entry["date"])),
}
for entry in entries
]
await col_entries.insert_many(entries_clean)
await answer.reply_text(
f"You have successfully inserted {len(entries_clean)} entries."
)

View File

@@ -0,0 +1,12 @@
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["remove_commands"], prefixes=["/"]) # type: ignore
)
async def command_remove_commands(app: PyroClient, message: Message):
await message.reply_text("Okay.")
await app.remove_commands(command_sets=await app.collect_commands())

View File

@@ -0,0 +1,60 @@
import logging
from datetime import datetime
from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["set_offset"], prefixes=["/"]) # type: ignore
)
async def command_set_offset(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
"Alright. Please, send how many days in advance do you want to get a notification about the collection.",
reply_markup=ForceReply(placeholder="Number of days"),
)
while True:
answer = await listen_message(app, message.chat.id, 300)
if answer is None or answer.text == "/cancel":
await message.reply_text("Cancelled.", reply_markup=ReplyKeyboardRemove())
return
try:
num = int(answer.text)
if num < 0 or num > 7:
raise ValueError(
"Offset bust not be less than 0 and greater than 7 days."
)
except (ValueError, TypeError):
await answer.reply_text(
"Please, provide a valid integer number of days in range 0 to 7 (inclusive). Use /cancel if you want to cancel this operation."
)
continue
break
offset = int(answer.text)
await user.update_offset(offset)
logger.info("User %s has set offset to %s", user.id, offset)
notice = "" if user.enabled else "Execute /toggle to enable notifications."
garbage_time = datetime(
1970, 1, 1, hour=user.time_hour, minute=user.time_minute
).strftime(app._("time", "formats"))
await answer.reply_text(
f"Notifications time has been updated! You will now receive notification about collection **{offset} d.** before the collection at {garbage_time}. {notice}",
reply_markup=ReplyKeyboardRemove(),
)

View File

@@ -0,0 +1,58 @@
import logging
from datetime import datetime
from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import ForceReply, Message, ReplyKeyboardRemove
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["set_time"], prefixes=["/"]) # type: ignore
)
async def command_set_time(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
"Alright. Please, send your desired time in HH:MM format.",
reply_markup=ForceReply(placeholder="Time as HH:MM"),
)
while True:
answer = await listen_message(app, message.chat.id, 300)
if answer is None or answer.text == "/cancel":
await message.reply_text("Cancelled.", reply_markup=ReplyKeyboardRemove())
return
try:
datetime.strptime(answer.text, "%H:%M")
except ValueError:
await answer.reply_text(
"Please, provide a valid time in HH:MM format. Use /cancel if you want to cancel this operation."
)
continue
break
user_time = datetime.strptime(answer.text, "%H:%M")
await user.update_time(hour=user_time.hour, minute=user_time.minute)
logger.info(
"User %s has selected notification time of %s",
user.id,
user_time.strftime("%H:%M"),
)
notice = "" if user.enabled else "Execute /toggle to enable notifications."
garbage_time = user_time.strftime(app._("time", "formats"))
await answer.reply_text(
f"Notifications time has been updated! You will now receive notification about collection {user.offset} d. before the collection at **{garbage_time}**. {notice}",
reply_markup=ReplyKeyboardRemove(),
)

104
plugins/commands/setup.py Normal file
View File

@@ -0,0 +1,104 @@
import logging
from convopyro import listen_message
from libbot import i18n
from pykeyboard import ReplyButton, ReplyKeyboard
from pyrogram import filters
from pyrogram.types import Message, ReplyKeyboardRemove
from classes.pyroclient import PyroClient
logger = logging.getLogger(__name__)
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["setup"] + i18n.sync.in_all_locales("configure", "buttons"), prefixes=["/", ""]) # type: ignore
)
async def command_setup(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(
"Holy... This one is still WIP...", reply_markup=ReplyKeyboardRemove()
)
# # City selection
# city_names = [city_iter.name for city_iter in await app.get_cities()]
# keyboard_cities = ReplyKeyboard(resize_keyboard=True, row_width=2)
# keyboard_cities.add(*[ReplyButton(name) for name in city_names])
# await message.reply_text(
# "Alright. Please, use the keyboard provided to choose your town.",
# reply_markup=keyboard_cities,
# )
# while True:
# answer_city = await listen_message(app, message.chat.id, 300)
# if answer_city is None or answer_city.text == "/cancel":
# await message.reply_text("Cancelled.")
# return
# if answer_city.text not in city_names:
# await answer_city.reply_text(
# "Please, select a valid town using keyboard provided. Use /cancel if you want to cancel this operation."
# )
# continue
# break
# # City recognition
# city = await app.find_city(name=answer_city.text)
# # District selection
# district_names = [district_iter.name for district_iter in city.districts]
# keyboard_districts = ReplyKeyboard(resize_keyboard=True, row_width=2)
# keyboard_districts.add(*[ReplyButton(name) for name in district_names])
# await message.reply_text(
# "Alright. Please, use the keyboard provided to choose your district.",
# reply_markup=keyboard_districts,
# )
# while True:
# answer_district = await listen_message(app, message.chat.id, 300)
# if answer_district is None or answer_district.text == "/cancel":
# await message.reply_text("Cancelled.")
# return
# if answer_district.text not in district_names:
# await answer_district.reply_text(
# "Please, select a valid district using keyboard provided. Use /cancel if you want to cancel this operation."
# )
# continue
# break
# # District recognition
# district_results = city.find_district(answer_district.text)
# if len(district_results) == 0:
# await answer_district.reply_text(
# "Something went wrong. Could not find this district in the database.",
# reply_markup=ReplyKeyboardRemove(),
# )
# return
# district = district_results[0]
# await user.update_city(city.id)
# await user.update_district(district.id)
# logger.info(
# "User %s has finished the location set up with city %s and district %s selected",
# user.id,
# city.id,
# district.id,
# )
# notice = "" if user.enabled else "Execute /toggle to enable notifications."
# await answer_district.reply_text(
# f"All set! You will now receive notification about garbage collection in district **{district.name}** of the town **{city.name}**. {notice}",
# reply_markup=ReplyKeyboardRemove(),
# )

View File

@@ -0,0 +1,16 @@
import asyncio
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled
& filters.private
& filters.command(["shutdown", "reboot", "restart"], prefixes=["/"]) # type: ignore
)
async def command_shutdown(app: PyroClient, msg: Message):
if msg.from_user.id == app.owner:
asyncio.get_event_loop().create_task(app.stop())

96
plugins/commands/start.py Normal file
View File

@@ -0,0 +1,96 @@
from datetime import datetime
from convopyro import listen_message
from pyrogram import filters
from pyrogram.types import (
KeyboardButton,
Message,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
)
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["start"], prefixes=["/"]) # type: ignore
)
async def command_start(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await message.reply_text(app._("start", "messages", locale=user.locale))
join_code = None if len(message.command) == 1 else message.command[1]
if join_code is not None:
try:
location = await app.get_location(int(join_code))
except ValueError:
await message.reply_text(
"🚫 You have provided the location but it does not seem to be a valid one. Please, use the command /setup to manually configure the location."
)
return
keyboard = ReplyKeyboardMarkup(
[
[KeyboardButton("Yes, I want to use it")],
[KeyboardButton("No, I don't want to use it")],
],
resize_keyboard=True,
one_time_keyboard=True,
)
await message.reply_text(
f" You have started the bot by the link containing a location **{location.name}**.\n\nPlease, confirm whether you want to use it as your location.",
reply_markup=keyboard,
)
while True:
answer = await listen_message(app, message.chat.id, 300)
if answer is None or answer.text == "/cancel":
await message.reply_text(
"Cancelled.", reply_markup=ReplyKeyboardRemove()
)
return
if answer.text not in [
"Yes, I want to use it",
"No, I don't want to use it",
]:
await answer.reply_text(
"Please, select a valid location using keyboard provided. Use /cancel if you want to cancel this operation."
)
continue
if answer.text in [
"No, I don't want to use it",
]:
await answer.reply_text(
"Alright, you're on your own now. Please, use the command /setup to configure your location and start receiving reminders.",
reply_markup=ReplyKeyboardRemove(),
)
return
break
await user.update_location(location.id)
user_time = datetime(1970, 1, 1, user.time_hour, user.time_minute).strftime(
app._("time", "formats", locale=user.locale)
)
await answer.reply_text(
f"✅ Finished! Your location is now **{location.name}**. You will receive reminders about garbage collection {user.offset} d. in advance at {user_time}.\n\nPlease, visit /help if you want to know how to change notifications time or disable them.",
reply_markup=ReplyKeyboardRemove(),
)
return
if user.location is None:
await message.reply_text(
"📍 Let's configure your location. Press the button on pop-up keyboard to start the process.",
reply_markup=ReplyKeyboardMarkup(
[[KeyboardButton(app._("configure", "buttons", locale=user.locale))]],
resize_keyboard=True,
one_time_keyboard=True,
),
)

View File

@@ -0,0 +1,29 @@
from datetime import datetime
from pyrogram import filters
from pyrogram.types import Message
from classes.pyroclient import PyroClient
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["toggle"], prefixes=["/"]) # type: ignore
)
async def command_toggle(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
await user.update_state(not user.enabled)
if user.enabled:
await message.reply_text("Notifications have been disabled.")
return
if user.location is None:
await message.reply_text(
f"Notifications have been enabled {user.offset} d. before garbage collection at {datetime(1970, 1, 1, user.time_hour, user.time_minute).strftime('%H:%M')}. Use /setup to select your location."
)
return
await message.reply_text(
f"Notifications have been enabled {user.offset} d. before garbage collection at {datetime(1970, 1, 1, user.time_hour, user.time_minute).strftime('%H:%M')} at the **{user.location.name}**."
)

View File

@@ -0,0 +1,59 @@
from datetime import datetime, timedelta, timezone
from pyrogram import filters
from pyrogram.types import Message
from pytz import timezone as pytz_timezone
from classes.garbage_entry import GarbageEntry
from classes.pyroclient import PyroClient
from modules.database import col_entries
@PyroClient.on_message(
~filters.scheduled & filters.private & filters.command(["upcoming"], prefixes=["/"]) # type: ignore
)
async def command_upcoming(app: PyroClient, message: Message):
user = await app.find_user(message.from_user)
if user.location is None:
await message.reply_text(
"You have no location set. Use /setup to select your location."
)
return
date_min = (
datetime.now(pytz_timezone(user.location.timezone)).replace(
second=0, microsecond=0
)
).replace(tzinfo=timezone.utc)
date_max = (
datetime.now(pytz_timezone(user.location.timezone)).replace(
second=0, microsecond=0
)
+ timedelta(days=30)
).replace(tzinfo=timezone.utc)
entries = [
await GarbageEntry.from_record(entry)
async for entry in col_entries.find(
{
"location": {"$in": user.location.id},
"date": {"$gte": date_min, "$lte": date_max},
}
)
]
entries_text = "\n\n".join(
[
f"**{entry.date.strftime(app._('date', 'formats', locale=user.locale))}**:\n{app._(str(entry.garbage_type.value), 'garbage_types')}"
for entry in entries
]
)
if not entries:
await message.reply_text(
f"No garbage collection entries found for the next 30 days at **{user.location.name}**"
)
return
await message.reply_text(f"Upcoming garbage collection:\n\n{entries_text}")