commit fe95688f9d4d3ddd5e7a3e73a4019bcb2c71dcc7 Author: Pedro Ferreira Date: Thu May 25 12:47:01 2017 +0200 First partially working implementation diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..13da680 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.cache +__pycache__ +*.pyc +node_modules diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bb0df3b --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +pynacl +simplejson +pytest diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..aa079ec --- /dev/null +++ b/setup.cfg @@ -0,0 +1,2 @@ +[flake8] +max-line-length=120 diff --git a/ssb/__init__.py b/ssb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ssb/boxstream.py b/ssb/boxstream.py new file mode 100644 index 0000000..2e35763 --- /dev/null +++ b/ssb/boxstream.py @@ -0,0 +1,77 @@ +import struct +from nacl.secret import SecretBox + +from .util import bytes_to_long, long_to_bytes + +NONCE_SIZE = 24 +HEADER_LENGTH = 2 + 16 + 16 +MAX_SEGMENT_SIZE = 4 * 1024 +TERMINATION_HEADER = (b'\x00' * HEADER_LENGTH) +MAX_NONCE = (8 * NONCE_SIZE) + + +def inc_nonce(nonce): + num = bytes_to_long(nonce) + 1 + if num > 2**MAX_NONCE: + num = 0 + bnum = long_to_bytes(num) + bnum = b'\x00' * (NONCE_SIZE - len(bnum)) + bnum + return bnum + + +def get_stream_pair(reader, writer, **kwargs): + shared_secret = kwargs.pop('shared_secret') + return UnboxStream(reader, shared_secret, **kwargs), BoxStream(writer, shared_secret, **kwargs) + + +class UnboxStream(object): + def __init__(self, reader, shared_secret, **key_data): + self.reader = reader + self.decrypt_key = key_data.get('decrypt_key') + self.decrypt_nonce = key_data.get('decrypt_nonce') + + async def process(self): + while True: + data = await self.reader.read(HEADER_LENGTH) + if not data: + break + + box = SecretBox(self.decrypt_key) + header = box.decrypt(data, self.decrypt_nonce) + + if header == TERMINATION_HEADER: + return + + length = struct.unpack('>H', header[:2])[0] + mac = header[2:] + + data = await self.reader.read(length) + + self.decrypt_nonce = inc_nonce(self.decrypt_nonce) + body = box.decrypt(mac + data, self.decrypt_nonce) + + self.decrypt_nonce = inc_nonce(self.decrypt_nonce) + yield body + print('Disconnect') + + +class BoxStream(object): + def __init__(self, writer, shared_secret, **key_data): + self.writer = writer + self.encrypt_key = key_data.get('decrypt_key') + self.encrypt_nonce = key_data.get('decrypt_nonce') + + async def write(self, data): + box = SecretBox(self.encrypt_key) + + # XXX: This nonce logic is almost for sure wrong + + self.encrypt_nonce = inc_nonce(self.encrypt_nonce) + + body = box.encrypt(data, self.encrypt_nonce) + header = struct.pack('>H', len(body)) + body[:16] + + self.writer.write(box.encrypt(header, self.encrypt_nonce)) + + self.encrypt_nonce = inc_nonce(self.encrypt_nonce) + self.writer.write(body) diff --git a/ssb/feed.py b/ssb/feed.py new file mode 100644 index 0000000..cc6c289 --- /dev/null +++ b/ssb/feed.py @@ -0,0 +1,61 @@ +import time +from base64 import b64encode +from collections import namedtuple, OrderedDict +from hashlib import sha256 + +from simplejson import dumps, loads + + +OrderedMsg = namedtuple('OrderedMsg', ('previous', 'author', 'sequence', 'timestamp', 'hash', 'content')) + + +def to_ordered(data): + smsg = OrderedMsg(**data) + return OrderedDict((k, getattr(smsg, k)) for k in smsg._fields) + + +class Message(object): + def __init__(self, keypair, content, timestamp=None, previous=None): + self.keypair = keypair + self.content = content + self.previous = previous + self.sequence = (self.previous.sequence + 1) if self.previous else 1 + self.timestamp = int(time.time() * 1000) if timestamp is None else timestamp + + @classmethod + def parse(cls, data, keypair): + obj = loads(data, object_pairs_hook=OrderedDict) + msg = cls(keypair, obj['content'], timestamp=obj['timestamp']) + return msg, obj['signature'] + + def to_dict(self, add_signature=True): + obj = to_ordered({ + 'previous': self.previous.key if self.previous else None, + 'author': self.keypair.tag, + 'sequence': self.sequence, + 'timestamp': self.timestamp, + 'hash': 'sha256', + 'content': self.content + }) + + if add_signature: + obj['signature'] = self.signature + return obj + + @property + def signature(self): + # ensure ordering of keys and indentation of 2 characters, like ssb-keys + data = dumps(self.to_dict(add_signature=False), indent=2) + return (b64encode(bytes(self.keypair.sign(data.encode('ascii')))) + b'.sig.ed25519').decode('ascii') + + def verify(self, signature): + return self.signature == signature + + @property + def hash(self): + hash = sha256(dumps(self.to_dict(), indent=2).encode('ascii')).digest() + return b64encode(hash).decode('ascii') + '.sha256' + + @property + def key(self): + return '%' + self.hash diff --git a/ssb/keys.py b/ssb/keys.py new file mode 100644 index 0000000..fe29da2 --- /dev/null +++ b/ssb/keys.py @@ -0,0 +1,28 @@ +from base64 import b64encode + +from nacl.signing import SigningKey + + +def tag(key): + """Create tag from publick key.""" + return b'@' + b64encode(bytes(key)) + b'.ed25519' + + +class KeyPair(object): + def __init__(self, seed=None): + self.private_key = SigningKey.generate() if seed is None else SigningKey(seed) + self.public_key = self.private_key.verify_key + + @property + def tag(self): + return tag(self.public_key).decode('ascii') + + @property + def private_tag(self): + return tag(self.private_key) + + def sign(self, data): + return self.private_key.sign(data).signature + + def __repr__(self): + return "".format(self) diff --git a/ssb/packet_stream.py b/ssb/packet_stream.py new file mode 100644 index 0000000..f644ebb --- /dev/null +++ b/ssb/packet_stream.py @@ -0,0 +1,77 @@ +import struct +from enum import Enum + +from .shs.socket import SHSClient, SHSServer + +import simplejson + + +class PSMessageType(Enum): + BUFFER = 0 + TEXT = 1 + JSON = 2 + + +class PSMessage(object): + def __init__(self, stream, end_err, type_, body): + self.stream = stream + self.end_err = end_err + self.type = PSMessageType(type_) + self.body = body + + @property + def data(self): + if self.type == PSMessageType.TEXT: + return self.body.decode('utf-8') + elif self.type == PSMessageType.JSON: + return simplejson.loads(self.body) + return self.body + + def __repr__(self): + return ''.format(self.type.name, self.data) + + +class PSSocket(object): + async def read(self): + while True: + try: + header = await self.connection.read().__anext__() + body = await self.connection.read().__anext__() + flags, length, req = struct.unpack('>BIi', header) + yield PSMessage(bool(flags & 0x08), bool(flags & 0x04), flags & 0x03, body) + except StopAsyncIteration: + await self.connection.disconnect() + break + + async def write(self, type_, data, req=0): + type_ = PSMessageType[type_] + if type_ == PSMessageType.JSON: + data = simplejson.dumps(data) + + # XXX: Not yet handling flags that nicely + + header = struct.pack('>BIi', 0x08 | type_.value, len(data), req) + await self.connection.write(header) + await self.connection.write(data.encode('utf-8')) + + +class PSClient(PSSocket): + def __init__(self, host, port, client_kp, server_pub_key, ephemeral_key=None, application_key=None): + self.connection = SHSClient(host, port, client_kp, server_pub_key, ephemeral_key=ephemeral_key, + application_key=application_key) + + async def connect(self, loop=None): + await self.connection.connect(loop=loop) + + +class PSServer(PSSocket): + def __init__(self, host, port, client_kp, application_key=None): + self.connection = SHSServer(host, port, client_kp, application_key=application_key) + + async def listen(self, loop=None): + await self.connection.listen(loop=loop) + + def on_connect(self, handler): + async def _on_connect(): + await handler(self) + self.connection._on_connect = _on_connect diff --git a/ssb/shs/__init__.py b/ssb/shs/__init__.py new file mode 100644 index 0000000..747a227 --- /dev/null +++ b/ssb/shs/__init__.py @@ -0,0 +1,4 @@ +from .crypto import SHSClientCrypto, SHSServerCrypto +from .socket import SHSClient, SHSServer + +__all__ = ('SHSClient', 'SHSClientCrypto', 'SHSServer', 'SHSServerCrypto') diff --git a/ssb/shs/crypto.py b/ssb/shs/crypto.py new file mode 100644 index 0000000..d49367c --- /dev/null +++ b/ssb/shs/crypto.py @@ -0,0 +1,160 @@ +import hashlib +import hmac +from base64 import b64decode + +from nacl.bindings import crypto_scalarmult, crypto_box_afternm, crypto_box_open_afternm +from nacl.exceptions import CryptoError +from nacl.public import PrivateKey +from nacl.signing import VerifyKey + +APPLICATION_KEY = b64decode('1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=') + + +class SHSError(Exception): + """A SHS exception.""" + pass + + +class SHSCryptoBase(object): + def __init__(self, local_key, ephemeral_key=None, application_key=None): + self.local_key = local_key + self.application_key = application_key or APPLICATION_KEY + self._reset_keys(ephemeral_key or PrivateKey.generate()) + + def _reset_keys(self, ephemeral_key): + self.local_ephemeral_key = ephemeral_key + self.local_app_hmac = (hmac.new(self.application_key, bytes(ephemeral_key.public_key), digestmod='sha512') + .digest()[:32]) + + def generate_challenge(self): + """Generate and return a challenge to be sent to the server.""" + return self.local_app_hmac + bytes(self.local_ephemeral_key.public_key) + + def verify_challenge(self, data): + """Verify the correctness of challenge sent from the client.""" + assert len(data) == 64 + sent_hmac, remote_ephemeral_key = data[:32], data[32:] + + h = hmac.new(self.application_key, remote_ephemeral_key, digestmod='sha512') + self.remote_app_hmac = h.digest()[:32] + ok = self.remote_app_hmac == sent_hmac + + if ok: + # this is (a * b) + self.shared_secret = crypto_scalarmult(bytes(self.local_ephemeral_key), remote_ephemeral_key) + self.remote_ephemeral_key = remote_ephemeral_key + # this is hash(a * b) + self.shared_hash = hashlib.sha256(self.shared_secret).digest() + return ok + + def clean(self, new_ephemeral_key=None): + self._reset_keys(new_ephemeral_key or PrivateKey.generate()) + self.shared_secret = None + self.shared_hash = None + self.remote_ephemeral_key = None + + def get_box_keys(self): + shared_secret = hashlib.sha256(self.box_secret).digest() + return { + 'shared_secret': shared_secret, + 'encrypt_key': hashlib.sha256(shared_secret + bytes(self.remote_pub_key)).digest(), + 'decrypt_key': hashlib.sha256(shared_secret + bytes(self.local_key.verify_key)).digest(), + 'encrypt_nonce': self.remote_app_hmac[:24], + 'decrypt_nonce': self.local_app_hmac[:24] + } + + +class SHSServerCrypto(SHSCryptoBase): + def verify_client_auth(self, data): + assert len(data) == 112 + a_bob = crypto_scalarmult(bytes(self.local_key.to_curve25519_private_key()), self.remote_ephemeral_key) + box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob).digest() + self.hello = crypto_box_open_afternm(data, b'\x00' * 24, box_secret) + signature, public_key = self.hello[:64], self.hello[64:] + signed = self.application_key + bytes(self.local_key.verify_key) + self.shared_hash + pkey = VerifyKey(public_key) + + # will raise an exception if verification fails + pkey.verify(signed, signature) + self.remote_pub_key = pkey + b_alice = crypto_scalarmult(bytes(self.local_ephemeral_key), + bytes(self.remote_pub_key.to_curve25519_public_key())) + self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob + b_alice).digest()[:32] + return True + + def generate_accept(self): + okay = self.local_key.sign(self.application_key + self.hello + self.shared_hash).signature + d = crypto_box_afternm(okay, b'\x00' * 24, self.box_secret) + return d + + def clean(self, new_ephemeral_key=None): + super(SHSServerCrypto, self).clean(new_ephemeral_key=new_ephemeral_key) + self.hello = None + self.local_lterm_shared = None + + +class SHSClientCrypto(SHSCryptoBase): + """An object that encapsulates all the SHS client-side crypto. + + :param local_key: the :class:`ssb.keys.KeyPair` used by the client + :param local_ephemeral_key: a fresh local :class:`nacl.public.PrivateKey` + :param server_pub_key: the server's public key (``byte`` string) + :param application_key: the unique application key (``byte`` string), defaults to SSB's + """ + + def __init__(self, local_key, server_pub_key, ephemeral_key, application_key=None): + super(SHSClientCrypto, self).__init__(local_key, ephemeral_key, application_key) + self.remote_pub_key = VerifyKey(server_pub_key) + + def verify_server_challenge(self, data): + """Verify the correctness of challenge sent from the server.""" + # TODO: use super.verify_challenge and add extra logic + return super(SHSClientCrypto, self).verify_challenge(data) + + def generate_client_auth(self): + """Generate box[K|a*b|a*B](H)""" + curve_pkey = self.remote_pub_key.to_curve25519_public_key() + + # remote_lterm_shared is (a * B) + remote_lterm_shared = crypto_scalarmult(bytes(self.local_ephemeral_key), bytes(curve_pkey)) + self.remote_lterm_shared = remote_lterm_shared + + # this shall be hash(K | a * b | a * B) + box_secret = hashlib.sha256(self.application_key + self.shared_secret + remote_lterm_shared).digest() + + # and message_to_box will correspond to H = sign(A)[K | Bp | hash(a * b)] | Ap + signed_message = self.local_key.sign(self.application_key + bytes(self.remote_pub_key) + self.shared_hash) + message_to_box = signed_message.signature + bytes(self.local_key.verify_key) + self.client_auth = message_to_box + + nonce = b"\x00" * 24 + # return box(K | a * b | a * B)[H] + return crypto_box_afternm(message_to_box, nonce, box_secret) + + def verify_server_accept(self, data): + """Verify that the server's accept message is sane""" + curve_lkey = self.local_key.to_curve25519_private_key() + # local_lterm_shared is (A * b) + local_lterm_shared = crypto_scalarmult(bytes(curve_lkey), self.remote_ephemeral_key) + self.local_lterm_shared = local_lterm_shared + # this is hash(K | a * b | a * B | A * b) + self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + self.remote_lterm_shared + + local_lterm_shared).digest() + + nonce = b"\x00" * 24 + + try: + # let's use the box secret to unbox our encrypted message + signature = crypto_box_open_afternm(data, nonce, self.box_secret) + except CryptoError: + raise SHSError('Error decrypting server acceptance message') + + # we should have received sign(B)[K | H | hash(a * b)] + # let's see if that signature can verify the reconstructed data on our side + self.remote_pub_key.verify(self.application_key + self.client_auth + self.shared_hash, signature) + return True + + def clean(self, new_ephemeral_key=None): + super(SHSClientCrypto, self).clean(new_ephemeral_key=new_ephemeral_key) + self.remote_lterm_shared = None + self.local_lterm_shared = None diff --git a/ssb/shs/socket.py b/ssb/shs/socket.py new file mode 100644 index 0000000..ee3db58 --- /dev/null +++ b/ssb/shs/socket.py @@ -0,0 +1,88 @@ +from asyncio import open_connection, start_server + +from ..boxstream import get_stream_pair +from .crypto import SHSClientCrypto, SHSServerCrypto + + +class SHSClientException(Exception): + pass + + +class SHSSocket(object): + async def read(self): + async for msg in self.read_stream.process(): + yield msg + + async def write(self, data): + await self.write_stream.write(data) + + async def disconnect(self): + self.writer.close() + + +class SHSServer(SHSSocket): + def __init__(self, host, port, server_kp, application_key=None): + self.host = host + self.port = port + self.crypto = SHSServerCrypto(server_kp.private_key, application_key=application_key) + self._on_connect = None + + async def _handshake(self, reader, writer): + data = await reader.read(64) + if not self.crypto.verify_challenge(data): + raise SHSClientException('Client challenge is not valid') + + writer.write(self.crypto.generate_challenge()) + + data = await reader.read(112) + if not self.crypto.verify_client_auth(data): + raise SHSClientException('Client auth is not valid') + + writer.write(self.crypto.generate_accept()) + + async def handle_connection(self, reader, writer): + self.crypto.clean() + await self._handshake(reader, writer) + + keys = self.crypto.get_box_keys() + self.crypto.clean() + + self.read_stream, self.write_stream = get_stream_pair(reader, writer, **keys) + self.writer = writer + + if self._on_connect: + await self._on_connect() + + async def listen(self, loop=None): + await start_server(self.handle_connection, self.host, self.port, loop=loop) + + +class SHSClient(SHSSocket): + def __init__(self, host, port, client_kp, server_pub_key, ephemeral_key=None, application_key=None): + self.host = host + self.port = port + self.crypto = SHSClientCrypto(client_kp.private_key, server_pub_key, ephemeral_key=ephemeral_key, + application_key=application_key) + + async def _handshake(self, reader, writer): + writer.write(self.crypto.generate_challenge()) + + data = await reader.read(64) + if not self.crypto.verify_server_challenge(data): + raise SHSClientException('Server challenge is not valid') + + writer.write(self.crypto.generate_client_auth()) + + data = await reader.read(80) + if not self.crypto.verify_server_accept(data): + raise SHSClientException('Server accept is not valid') + + async def connect(self, loop=None): + reader, writer = await open_connection(self.host, self.port, loop=loop) + await self._handshake(reader, writer) + + keys = self.crypto.get_box_keys() + self.crypto.clean() + + self.read_stream, self.write_stream = get_stream_pair(reader, writer, **keys) + self.writer = writer diff --git a/ssb/tests/test_feed.py b/ssb/tests/test_feed.py new file mode 100644 index 0000000..34e8b40 --- /dev/null +++ b/ssb/tests/test_feed.py @@ -0,0 +1,48 @@ +from base64 import b64decode +from collections import OrderedDict + +import pytest + +from ssb.keys import KeyPair, Message + + +@pytest.fixture() +def keypair(): + secret = b64decode('Mz2qkNOP2K6upnqibWrR+z8pVUI1ReA1MLc7QMtF2qQ=') + return KeyPair(secret) + + +def test_keypair(): + secret = b64decode('Mz2qkNOP2K6upnqibWrR+z8pVUI1ReA1MLc7QMtF2qQ=') + kp = KeyPair(secret) + assert bytes(kp.private_key) == secret + assert bytes(kp.public_key) == b64decode('I/4cyN/jPBbDsikbHzAEvmaYlaJK33lW3UhWjNXjyrU=') + assert kp.tag == '@I/4cyN/jPBbDsikbHzAEvmaYlaJK33lW3UhWjNXjyrU=.ed25519' + + +def test_message(keypair): + m1 = Message(keypair, OrderedDict([ + ('type', 'about'), + ('about', keypair.tag), + ('name', 'neo'), + ('description', 'The Chosen One') + ]), timestamp=1495706260190) + assert m1.timestamp == 1495706260190 + assert m1.previous is None + assert m1.sequence == 1 + assert m1.signature == \ + 'lPsQ9P10OgeyH6u0unFgiI2wV/RQ7Q2x2ebxnXYCzsJ055TBMXphRADTKhOMS2EkUxXQ9k3amj5fnWPudGxwBQ==.sig.ed25519' + assert m1.key == '%xRDqws/TrQmOd4aEwZ32jdLhP873ZKjIgHlggPR0eoo=.sha256' + + m2 = Message(keypair, OrderedDict([ + ('type', 'about'), + ('about', keypair.tag), + ('name', 'morpheus'), + ('description', 'Dude with big jaw') + ]), previous=m1, timestamp=1495706447426) + assert m2.timestamp == 1495706447426 + assert m2.previous is m1 + assert m2.sequence == 2 + assert m2.signature == \ + '3SY85LX6/ppOfP4SbfwZbKfd6DccbLRiB13pwpzbSK0nU52OEJxOqcJ2Uensr6RkrWztWLIq90sNOn1zRAoOAw==.sig.ed25519' + assert m2.key == '%nx13uks5GUwuKJC49PfYGMS/1pgGTtwwdWT7kbVaroM=.sha256' diff --git a/ssb/tests/test_shs.py b/ssb/tests/test_shs.py new file mode 100644 index 0000000..dee1e97 --- /dev/null +++ b/ssb/tests/test_shs.py @@ -0,0 +1,17 @@ +import hashlib + +import pytest +from nacl.public import PrivateKey + +from ssb.shs import SecretHandShake + + +@pytest.fixture() +def appkey(): + return hashlib.sha256(b'app_key').digest() + + +def test_client_challenge(appkey): + pk = PrivateKey.generate() + shs = SecretHandShake(pk, application_key=appkey) + assert shs.client_challenge diff --git a/ssb/util.py b/ssb/util.py new file mode 100644 index 0000000..27523f8 --- /dev/null +++ b/ssb/util.py @@ -0,0 +1,53 @@ +import struct + +# Stolen from PyCypto (Public Domain) + + +def b(s): + return s.encode("latin-1") # utf-8 would cause some side-effects we don't want + + +def long_to_bytes(n, blocksize=0): + """long_to_bytes(n:long, blocksize:int) : string + Convert a long integer to a byte string. + If optional blocksize is given and greater than zero, pad the front of the + byte string with binary zeros so that the length is a multiple of + blocksize. + """ + # after much testing, this algorithm was deemed to be the fastest + s = b('') + pack = struct.pack + while n > 0: + s = pack('>I', n & 0xffffffff) + s + n = n >> 32 + # strip off leading zeros + for i in range(len(s)): + if s[i] != b('\000')[0]: + break + else: + # only happens when n == 0 + s = b('\000') + i = 0 + s = s[i:] + # add back some pad bytes. this could be done more efficiently w.r.t. the + # de-padding being done above, but sigh... + if blocksize > 0 and len(s) % blocksize: + s = (blocksize - len(s) % blocksize) * b('\000') + s + return s + + +def bytes_to_long(s): + """bytes_to_long(string) : long + Convert a byte string to a long integer. + This is (essentially) the inverse of long_to_bytes(). + """ + acc = 0 + unpack = struct.unpack + length = len(s) + if length % 4: + extra = (4 - length % 4) + s = b('\000') * extra + s + length = length + extra + for i in range(0, length, 4): + acc = (acc << 32) + unpack('>I', s[i:i+4])[0] + return acc diff --git a/test_client.py b/test_client.py new file mode 100644 index 0000000..825b795 --- /dev/null +++ b/test_client.py @@ -0,0 +1,19 @@ +from asyncio import get_event_loop +from base64 import b64decode + +from ssb.keys import KeyPair +from ssb.packet_stream import PSClient + +server_pub_key = b64decode('--- your public key ---') + + +async def main(loop): + await packet_stream.connect(loop) + async for msg in packet_stream.read(): + print(msg) + print('bye') + +packet_stream = PSClient('127.0.0.1', 8008, KeyPair(), server_pub_key) +loop = get_event_loop() +loop.run_until_complete(main(loop)) +loop.close() diff --git a/test_server.py b/test_server.py new file mode 100644 index 0000000..f9fd870 --- /dev/null +++ b/test_server.py @@ -0,0 +1,31 @@ +from asyncio import get_event_loop +from base64 import b64decode + +from ssb.keys import KeyPair +from ssb.packet_stream import PSServer + + +priv_key = b64decode('--- your private key ---') + + +async def main(loop): + await packet_stream.listen(loop) + + +async def on_connect(server): + await server.write('JSON', {"name": ["createHistoryStream"], + "args": [{ + "id": "@/Odg52x38pt7OivNnxK1Lm+H/+x6yV4DhMeXHBQRYc0=.ed25519", + "seq": 9, + "live": True, + "keys": False + }], + "type": "source"}, req=1) + print(await server.read().__anext__()) + +packet_stream = PSServer('127.0.0.1', 8008, KeyPair(priv_key[:32])) +packet_stream.on_connect(on_connect) +loop = get_event_loop() +loop.run_until_complete(main(loop)) +loop.run_forever() +loop.close()