add async iterator and async context manager

This commit is contained in:
YoggieS
2024-09-15 20:14:16 +02:00
parent 99899860df
commit 163f07b97e
3 changed files with 62 additions and 19 deletions

View File

@@ -29,7 +29,6 @@ import asyncio
from eventapi.EventApi import EventApi
from eventapi.WebSocket import (
Dispatch,
EventType,
ResponseTypes,
SubscriptionCondition,
@@ -38,11 +37,7 @@ from eventapi.WebSocket import (
async def callback(data: ResponseTypes):
if isinstance(data, Dispatch):
print(f"Event data type: {data.type}")
print(f"Event data body: {data.body}")
else:
print(f"Raw event data: {data.raw_data}")
print("Message from callback: ", data)
async def main():
@@ -51,6 +46,7 @@ async def main():
# connect to websocket
await app.connect()
print("Connected")
# create subscription with specified condition
condition = SubscriptionCondition(object_id="6433b7cec07d26f890dd2d01")
@@ -59,7 +55,11 @@ async def main():
)
await app.subscribe(subscription_data=subscription)
# run forever
# you can also use async iterator without specifying callback
async for message in app:
print("Message from async iterator: ", message)
# run forever if we are not using async for
await asyncio.Future()
@@ -73,7 +73,7 @@ asyncio.run(main())
<img src="https://github.com/yoggys/eventapi/blob/master/assets/example_dc.png" alt="Discord Bot example" height="450px">
```python
from typing import Any
from typing import Any, Dict, List
import aiohttp
import discord
@@ -88,7 +88,7 @@ from eventapi.WebSocket import (
)
async def format_url(data: dict[str, Any]) -> str:
async def format_url(data: Dict[str, Any]) -> str:
base_url = "https://cdn.7tv.app/emote/{}/4x".format(data.get("id"))
if "animated" not in data:
async with aiohttp.ClientSession() as cs:
@@ -108,7 +108,7 @@ async def callback(data: ResponseTypes) -> None:
return
channel = client.get_channel(927288026000945162) # your channel id
changes: list[discord.Embed] = []
changes: List[discord.Embed] = []
def add_change(description: str, color: discord.Color, image_url: str):
changes.append(
@@ -148,7 +148,7 @@ client = discord.Client(intents=intents)
client.eventapi = EventApi(callback=callback)
@client.event
@client.listen("on_ready", once=True)
async def on_ready():
await client.eventapi.connect()
condition = SubscriptionCondition(

View File

@@ -1,9 +1,12 @@
import asyncio
import atexit
import json
import signal
import socket
from typing import Any, Callable, Coroutine, Optional
from typing import Any, Callable, Coroutine, Dict, List, Optional
import websockets
from typing_extensions import Self
from eventapi.Exceptions import (
ConnectionException,
@@ -35,12 +38,46 @@ class EventApi:
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.handler: Optional[asyncio.Task] = None
self.previous_session_id: Optional[str] = None
self.queue: Optional[asyncio.Queue] = None
self.session_id: Optional[str] = None
self.subscription_limit: int = 100
self.subscriptions: list[SubscriptionData] = []
self.subscription_limit: int = 500
self.subscriptions: List[SubscriptionData] = []
self.callback = callback
atexit.register(self._close_sync)
signal.signal(signal.SIGINT, self._handle_exit)
signal.signal(signal.SIGTERM, self._handle_exit)
async def __aenter__(self) -> Self:
await self.connect()
return self
async def __aexit__(self) -> None:
await self.close()
def __aiter__(self):
if not self.closed:
self.queue = asyncio.Queue()
return self
async def __anext__(self):
return await self.queue.get()
def __str__(self) -> str:
return f"<EventApi session_id='{self.session_id}' subscription_limit={self.subscription_limit} closed={self.closed}>"
@property
def closed(self) -> bool:
return self.ws is None or self.ws.closed
def _close_sync(self) -> None:
if not self.closed:
asyncio.run(self.close())
def _handle_exit(self, signum, frame) -> None:
if not self.closed:
asyncio.run(self.close())
async def connect(self) -> None:
try:
self.ws = await websockets.connect(self.WS_URL)
@@ -49,12 +86,15 @@ class EventApi:
raise ConnectionException()
async def reconnect(self) -> None:
await self.close()
await self.connect()
async def close(self) -> None:
self.handler.cancel()
self.handler = None
if not self.ws.closed:
await self.ws.close()
self.ws = None
await self.connect()
async def message_handler(self) -> None:
try:
@@ -63,16 +103,18 @@ class EventApi:
if isinstance(message, bytes):
message = message.decode("utf-8")
asyncio.create_task(self.on_message(message))
except websockets.exceptions.ConnectionClosed:
except:
asyncio.create_task(self.reconnect())
async def on_message(self, message: str) -> None:
message = json.loads(message)
parsed_message = await self.parse_message(message)
if self.queue:
await self.queue.put(parsed_message)
if self.callback and parsed_message:
asyncio.create_task(self.callback(parsed_message))
async def parse_message(self, message: dict[str, Any]) -> Optional[ResponseTypes]:
async def parse_message(self, message: Dict[str, Any]) -> Optional[ResponseTypes]:
message_data = message.get("d")
message_code = message.get("op")

View File

@@ -7,7 +7,7 @@ long_description = (cwd / "README.md").read_text()
setup(
name="eventapi",
version="1.0.4",
version="1.1.0",
author="yoggys",
author_email="yoggies@yoggies.dev",
description="Wrapper for 7TV EventAPI.",
@@ -17,6 +17,7 @@ setup(
packages=find_packages(),
install_requires=[
"websockets ~= 12.0",
"typing_extensions ~= 4.12.2",
],
classifiers=[
"Development Status :: 4 - Beta",