diff --git a/poetry.lock b/poetry.lock index 7861176..8c3db10 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1310,4 +1310,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "bd8b3213143f1abe13f580d28e2d42ee3d663c2d010548e7acd27be04912308a" +content-hash = "d57dc0c074d7daf70507fda1fc9641cf367b6dc8f02b34a5fceafe6b45c0f4f9" diff --git a/pyproject.toml b/pyproject.toml index da65dd5..553054b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,6 @@ readme = "README.rst" [tool.poetry.dependencies] python = "^3.9" -async_generator = "^1.8" PyNaCl = "^1.1.0" PyYAML = "^6.0.1" secret-handshake = { version = "0.1.0.dev3", allow-prereleases = true } diff --git a/ssb/muxrpc.py b/ssb/muxrpc.py index f48ad15..170cd52 100644 --- a/ssb/muxrpc.py +++ b/ssb/muxrpc.py @@ -22,8 +22,6 @@ """MuxRPC""" -from async_generator import async_generator, yield_ - from ssb.packet_stream import PSMessageType @@ -61,11 +59,14 @@ class MuxRPCSourceHandler(MuxRPCHandler): def __init__(self, ps_handler): self.ps_handler = ps_handler - @async_generator - async def __aiter__(self): - async for msg in self.ps_handler: - self.check_message(msg) - await yield_(msg) + def __aiter__(self): + return self + + async def __anext__(self): + msg = await self.ps_handler.__anext__() + self.check_message(msg) + + return msg class MuxRPCSinkHandlerMixin: # pylint: disable=too-few-public-methods diff --git a/ssb/packet_stream.py b/ssb/packet_stream.py index e0ba88a..b51e25c 100644 --- a/ssb/packet_stream.py +++ b/ssb/packet_stream.py @@ -29,7 +29,6 @@ from math import ceil import struct from time import time -from async_generator import async_generator, yield_ import simplejson logger = logging.getLogger("packet_stream") @@ -61,13 +60,16 @@ class PSStreamHandler: await self.queue.put(None) - @async_generator - async def __aiter__(self): - while True: - elem = await self.queue.get() - if not elem: - return - await yield_(elem) + def __aiter__(self): + return self + + async def __anext__(self): + elem = await self.queue.get() + + if not elem: + raise StopAsyncIteration() + + return elem class PSRequestHandler: @@ -165,17 +167,18 @@ class PacketStream: return self.connection.is_connected - @async_generator - async def __aiter__(self): + def __aiter__(self): + return self + + async def __anext__(self): while True: msg = await self.read() if not msg: - return + raise StopAsyncIteration() - # filter out replies if msg.req >= 0: - await yield_(msg) + return msg async def __await__(self): async for data in self: