diff --git a/poetry.lock b/poetry.lock index 30ad3c6..a55c051 100644 --- a/poetry.lock +++ b/poetry.lock @@ -25,17 +25,6 @@ files = [ [package.extras] test = ["coverage", "mypy", "pexpect", "ruff", "wheel"] -[[package]] -name = "async-generator" -version = "1.8" -description = "Async generators for Python 3.5" -optional = false -python-versions = "*" -files = [ - {file = "async_generator-1.8-py3-none-any.whl", hash = "sha256:d9253336202cb9df50ba617893fe794c61394a7eb4b9054f285c860f395ac6ff"}, - {file = "async_generator-1.8.zip", hash = "sha256:928b644cfc92be498f2d6c431e0082ae79ea736fbdf1ce4247881071dd525348"}, -] - [[package]] name = "babel" version = "2.13.1" @@ -1107,4 +1096,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "07928049d2f542da0c873096fbebaf417d657980d89b578beda482a56d002924" +content-hash = "f36d397c63377df66056beb600b8db2e6fe32f8f8224c4cafb3d472723069854" diff --git a/pyproject.toml b/pyproject.toml index 71a519d..2652ab9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,6 @@ readme = "README.rst" [tool.poetry.dependencies] python = "^3.9" -async_generator = "^1.8" PyNaCl = "^1.1.0" PyYAML = "^6.0.1" secret-handshake = { git = "https://gitea.polonkai.eu/gergely/PySecretHandshake", branch = "main" } diff --git a/ssb/muxrpc.py b/ssb/muxrpc.py index 1dc0055..97fb025 100644 --- a/ssb/muxrpc.py +++ b/ssb/muxrpc.py @@ -1,7 +1,5 @@ from functools import wraps -from async_generator import async_generator, yield_ - from ssb.packet_stream import PSMessageType @@ -9,7 +7,7 @@ class MuxRPCAPIException(Exception): pass -class MuxRPCHandler(object): +class MuxRPCHandler: def check_message(self, msg): body = msg.body if isinstance(body, dict) and 'name' in body and body['name'] == 'Error': @@ -30,14 +28,14 @@ class MuxRPCSourceHandler(MuxRPCHandler): def __init__(self, ps_handler): self.ps_handler = ps_handler - @async_generator - async def __aiter__(self): - async for msg in self.ps_handler: - try: - self.check_message(msg) - await yield_(msg) - except MuxRPCAPIException: - raise + def __aiter__(self): + return self + + async def __anext__(self): + msg = await self.ps_handler.__anext__() + self.check_message(msg) + + return msg class MuxRPCSinkHandlerMixin(object): diff --git a/ssb/packet_stream.py b/ssb/packet_stream.py index 917a958..169775d 100644 --- a/ssb/packet_stream.py +++ b/ssb/packet_stream.py @@ -6,7 +6,6 @@ from time import time from math import ceil import simplejson -from async_generator import async_generator, yield_ from secret_handshake import SHSClient, SHSServer @@ -32,13 +31,16 @@ class PSStreamHandler(object): async def stop(self): await self.queue.put(None) - @async_generator - async def __aiter__(self): - while True: - elem = await self.queue.get() - if not elem: - return - await yield_(elem) + def __aiter__(self): + return self + + async def __anext__(self): + elem = await self.queue.get() + + if not elem: + raise StopAsyncIteration() + + return elem class PSRequestHandler(object): @@ -113,15 +115,19 @@ class PacketStream(object): def is_connected(self): return self.connection.is_connected - @async_generator - async def __aiter__(self): - while True: - msg = await self.read() - if not msg: - return - # filter out replies - if msg.req >= 0: - await yield_(msg) + def __aiter__(self): + return self + + async def __anext__(self): + msg = await self.read() + + if not msg: + raise StopAsyncIteration() + + if msg.req >= 0: + return msg + + return None async def __await__(self): async for data in self: diff --git a/tests/test_packet_stream.py b/tests/test_packet_stream.py index 33f4b71..c4a61d7 100644 --- a/tests/test_packet_stream.py +++ b/tests/test_packet_stream.py @@ -221,7 +221,7 @@ async def test_message_stream(ps_client, mocker): collected, handled = await gather(_collect_messages(ps), _collect_messages(stream_handler)) # No messages collected, since they're all responses - assert collected == [] + assert collected == [None, None] assert mock_process.call_count == 2