This repository has been archived on 2024-10-10. You can view files and clone it, but cannot push or open issues or pull requests.
TelegramBot/classes/pyrouser.py

232 lines
7.2 KiB
Python

import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any, Mapping, Tuple, Union
import pytz
from bson import ObjectId
from classes.location import Location
from modules.database import col_users
logger = logging.getLogger(__name__)
@dataclass
class PyroUser:
"""Dataclass of DB entry of a user"""
__slots__ = (
"_id",
"id",
"locale",
"enabled",
"location",
"offset",
"time_hour",
"time_minute",
)
_id: ObjectId
id: int
locale: Union[str, None]
enabled: bool
location: Union[Location, None]
offset: int
time_hour: int
time_minute: int
@classmethod
async def find(
cls,
id: int,
locale: Union[str, None] = None,
enabled: bool = True,
location_id: int = 0,
offset: int = 1,
time_hour: int = 16,
time_minute: int = 0,
) -> "PyroUser":
db_entry = await col_users.find_one({"id": id})
if db_entry is None:
inserted = await col_users.insert_one(
{
"id": id,
"locale": locale,
"enabled": enabled,
"location": location_id,
"offset": offset,
"time_hour": time_hour,
"time_minute": time_minute,
}
)
db_entry = await col_users.find_one({"_id": inserted.inserted_id})
if db_entry is None:
raise RuntimeError("Could not find inserted user entry.")
try:
db_entry["location"] = await Location.get(db_entry["location"]) # type: ignore
except ValueError:
db_entry["location"] = None # type: ignore
return cls(**db_entry)
@classmethod
async def from_dict(cls, **kwargs) -> "PyroUser":
if "location" in kwargs:
try:
kwargs["location"] = await Location.get(kwargs["location"]) # type: ignore
except ValueError:
kwargs["location"] = None # type: ignore
return cls(**kwargs)
async def update_locale(self, locale: Union[str, None]) -> Union[str, None]:
"""Change user's locale stored in the database.
### Args:
* locale (`Union[str, None]`): New locale to be set.
"""
logger.debug("%s's locale has been set to %s", self.id, locale)
await col_users.update_one({"_id": self._id}, {"$set": {"locale": locale}})
self.locale = locale
return self.locale
async def update_state(self, enabled: bool = False) -> bool:
logger.debug("%s's state has been set to %s", self.id, enabled)
await col_users.update_one({"_id": self._id}, {"$set": {"enabled": enabled}})
self.enabled = enabled
return self.enabled
async def update_location(self, location_id: int = 0) -> Location:
logger.debug("%s's location has been set to %s", self.id, location_id)
await col_users.update_one(
{"_id": self._id}, {"$set": {"location": location_id}}
)
location = await Location.get(location_id)
# Execute if timezones of old and new locations are different
if self.location and (self.location.timezone.zone != location.timezone.zone):
# Get UTC time for selected reminder time
now_utc = datetime.now(pytz.utc).replace(
hour=self.time_hour, minute=self.time_minute, second=0, microsecond=0
)
# Get the time for the reminder time of old and new location
local_old = now_utc.astimezone(self.location.timezone)
local_new = (
location.timezone.localize(local_old.replace(tzinfo=None))
).astimezone(pytz.utc)
# Update the time to match the new timezone
await self.update_time(hour=local_new.hour, minute=local_new.minute)
self.location = location
return self.location
async def update_offset(self, offset: int = 1) -> int:
logger.debug("%s's offset has been set to %s", self.id, offset)
await col_users.update_one({"_id": self._id}, {"$set": {"offset": offset}})
self.offset = offset
return offset
async def update_time(self, hour: int = 16, minute: int = 0) -> Tuple[int, int]:
logger.debug("%s's time has been set to %s h. %s m.", self.id, hour, minute)
await col_users.update_one(
{"_id": self._id}, {"$set": {"time_hour": hour, "time_minute": minute}}
)
self.time_hour = hour
self.time_minute = minute
return self.time_hour, self.time_minute
async def delete(self) -> None:
logger.debug("%s's data has been deleted", self.id)
await col_users.delete_one({"_id": self._id})
async def checkout(self) -> Mapping[str, Any]:
logger.debug("%s's data has been checked out", self.id)
db_entry = await col_users.find_one({"_id": self._id})
if db_entry is None:
raise KeyError(
f"DB record with id {self._id} of user {self.id} is not found"
)
del db_entry["_id"] # type: ignore
return db_entry
def get_reminder_date(self) -> datetime:
"""Get next reminder date (year, month and day)
### Raises:
* `AttributeError`: Some attribute(s) are missing
### Returns:
* `datetime`: Datetime object of the next reminder date
"""
if self.location is None:
logger.warning(
"Could not get the reminder date for %s: User does not have some attribute(s) set",
self.id,
)
raise AttributeError(
f"Could not get the reminder date for {self.id}: User does not have some attribute(s) set"
)
if not self.location.timezone:
logger.warning("Location %s does not have a timezone set", self.location.id)
return (
datetime.now(self.location.timezone or pytz.utc) + timedelta(days=1)
).replace(hour=0, minute=0, second=0, microsecond=0)
def get_reminder_time(self) -> datetime:
"""Get reminder time (hour and minute)
### Raises:
* `AttributeError`: Some attribute(s) are missing
### Returns:
* `datetime`: Datetime object of the next reminder date
"""
if self.time_hour is None or self.time_minute is None or self.location is None:
logger.warning(
"Could not get the reminder time for %s: User does not have some attribute(s) set",
self.id,
)
raise AttributeError(
f"Could not get the reminder time for {self.id}: User does not have some attribute(s) set"
)
if not self.location.timezone:
logger.warning("Location %s does not have a timezone set", self.location.id)
return (
datetime.now(pytz.utc)
.replace(
hour=self.time_hour,
minute=self.time_minute,
second=0,
microsecond=0,
)
.astimezone(self.location.timezone or pytz.utc)
)