@@ -1,5 +1,7 @@
|
||||
from functools import wraps
|
||||
|
||||
from async_generator import async_generator, yield_
|
||||
|
||||
from ssb.packet_stream import PSMessageType
|
||||
|
||||
|
||||
@@ -28,11 +30,12 @@ class MuxRPCSourceHandler(MuxRPCHandler):
|
||||
def __init__(self, ps_handler):
|
||||
self.ps_handler = ps_handler
|
||||
|
||||
@async_generator
|
||||
async def __aiter__(self):
|
||||
async for msg in self.ps_handler:
|
||||
try:
|
||||
self.check_message(msg)
|
||||
yield msg
|
||||
await yield_(msg)
|
||||
except MuxRPCAPIException:
|
||||
raise
|
||||
|
||||
|
@@ -4,9 +4,10 @@ from asyncio import Event, Queue
|
||||
from enum import Enum
|
||||
from time import time
|
||||
|
||||
from secret_handshake import SHSClient, SHSServer
|
||||
|
||||
import simplejson
|
||||
from async_generator import async_generator, yield_
|
||||
|
||||
from secret_handshake import SHSClient, SHSServer
|
||||
|
||||
|
||||
logger = logging.getLogger('packet_stream')
|
||||
@@ -30,12 +31,13 @@ class PSStreamHandler(object):
|
||||
async def stop(self):
|
||||
await self.queue.put(None)
|
||||
|
||||
@async_generator
|
||||
async def __aiter__(self):
|
||||
while True:
|
||||
elem = await self.queue.get()
|
||||
if not elem:
|
||||
return
|
||||
yield elem
|
||||
await yield_(elem)
|
||||
|
||||
|
||||
class PSRequestHandler(object):
|
||||
@@ -154,6 +156,7 @@ class PSConnection(object):
|
||||
def register_handler(self, handler):
|
||||
self._event_map[handler.req] = (time(), handler)
|
||||
|
||||
@async_generator
|
||||
async def __aiter__(self):
|
||||
while True:
|
||||
msg = await self.read()
|
||||
@@ -161,7 +164,7 @@ class PSConnection(object):
|
||||
return
|
||||
# filter out replies
|
||||
if msg.req >= 0:
|
||||
yield msg
|
||||
await yield_(msg)
|
||||
|
||||
def _write(self, msg):
|
||||
logger.info('SEND [%d]: %r', msg.req, msg)
|
||||
|
@@ -1,5 +1,7 @@
|
||||
import pytest
|
||||
import json
|
||||
from asyncio import ensure_future, gather, Event
|
||||
|
||||
import pytest
|
||||
from asynctest import patch
|
||||
from nacl.signing import SigningKey
|
||||
|
||||
|
Reference in New Issue
Block a user