Handle split packets properly

This commit is contained in:
Pedro Ferreira 2017-07-30 11:51:11 +02:00
parent ae5b99acfa
commit dda0b488c6
2 changed files with 20 additions and 9 deletions

View File

@ -65,8 +65,7 @@ class PSRequestHandler(object):
class PSMessage(object): class PSMessage(object):
@classmethod @classmethod
def from_header_body(cls, header, body): def from_header_body(cls, flags, req, body):
flags, length, req = struct.unpack('>BIi', header)
type_ = PSMessageType(flags & 0x03) type_ = PSMessageType(flags & 0x03)
if type_ == PSMessageType.TEXT: if type_ == PSMessageType.TEXT:
@ -92,7 +91,11 @@ class PSMessage(object):
self.req = req self.req = req
def __repr__(self): def __repr__(self):
return '<PSMessage ({}): {}{} {}{}>'.format(self.type.name, self.body, if self.type == PSMessageType.BUFFER:
body = '{} bytes'.format(len(self.body))
else:
body = self.body
return '<PSMessage ({}): {}{} {}{}>'.format(self.type.name, body,
'' if self.req is None else ' [{}]'.format(self.req), '' if self.req is None else ' [{}]'.format(self.req),
'~' if self.stream else '', '!' if self.end_err else '') '~' if self.stream else '', '!' if self.end_err else '')
@ -107,9 +110,16 @@ class PSConnection(object):
header = await self.connection.read() header = await self.connection.read()
if not header: if not header:
return return
body = await self.connection.read() flags, length, req = struct.unpack('>BIi', header)
logger.debug('READ %s %s', header, body)
return PSMessage.from_header_body(header, body) n_packets = length // 4096 + 1
body = b''
for n in range(n_packets):
body += await self.connection.read()
logger.debug('READ %s %s', header, len(body))
return PSMessage.from_header_body(flags, req, body)
except StopAsyncIteration: except StopAsyncIteration:
logger.debug('DISCONNECT') logger.debug('DISCONNECT')
await self.connection.disconnect() await self.connection.disconnect()

View File

@ -50,9 +50,10 @@ async def main():
handler.send(True, end=True) handler.send(True, end=True)
break break
handler = api.call('blobs.add', [], 'sink') async for data in api.call('blobs.get', ['&/6q7JOKythgnnzoBI5xxvotCr5HeFkAIZSAuqHiZfLw=.sha256'], 'source'):
handler.send(b'dead0beef', msg_type=PSMessageType.BUFFER) if data.type.name == 'BUFFER':
handler.send(True, end=True) with open('./funny_img.png', 'wb') as f:
f.write(data.data)
# create console handler and set level to debug # create console handler and set level to debug
ch = logging.StreamHandler() ch = logging.StreamHandler()