Add load_ssb_secret() to utils

This commit is contained in:
Pedro Ferreira 2017-08-05 12:24:46 +02:00
parent fcb5981611
commit ee68cd125b
5 changed files with 59 additions and 67 deletions

View File

@ -1,16 +1,13 @@
import logging
import os
import struct
import time
from asyncio import get_event_loop, gather, ensure_future
from base64 import b64decode
import yaml
from colorlog import ColoredFormatter
from nacl.signing import SigningKey
from ssb.muxrpc import MuxRPCAPI, MuxRPCAPIException
from ssb.packet_stream import PSClient, PSMessageType
from ssb.util import load_ssb_secret
api = MuxRPCAPI()
@ -78,15 +75,10 @@ def main():
logger.setLevel(logging.INFO)
logger.addHandler(ch)
with open(os.path.expanduser('~/.ssb/secret')) as f:
config = yaml.load(f)
server_pub_key = b64decode(config['public'][:-8])
server_prv_key = b64decode(config['private'][:-8])
sign = SigningKey(server_prv_key[:32])
keypair = load_ssb_secret()['keypair']
loop = get_event_loop()
packet_stream = PSClient('127.0.0.1', 8008, sign, server_pub_key, loop=loop)
packet_stream = PSClient('127.0.0.1', 8008, keypair, bytes(keypair.verify_key), loop=loop)
loop.run_until_complete(_main(packet_stream))
loop.close()

View File

@ -1,17 +1,10 @@
import logging
import os
from asyncio import get_event_loop, ensure_future
from base64 import b64decode
import yaml
from colorlog import ColoredFormatter
from nacl.signing import SigningKey
from ssb.packet_stream import PSServer
with open(os.path.expanduser('~/.ssb/secret')) as f:
config = yaml.load(f)
from ssb.util import load_ssb_secret
async def on_connect():
@ -36,8 +29,7 @@ logger.addHandler(ch)
loop = get_event_loop()
server_keypair = SigningKey(b64decode(config['private'][:-8])[:32])
packet_stream = PSServer('127.0.0.1', 8008, server_keypair, loop=loop)
packet_stream = PSServer('127.0.0.1', 8008, load_ssb_secret()['keypair'], loop=loop)
packet_stream.on_connect(on_connect)
packet_stream.listen()

View File

@ -50,6 +50,7 @@ extras_require['all'] = sum((lst for lst in extras_require.values()), [])
install_requires = [
'pynacl==1.1.2',
'simplejson==3.10.0',
'PyYAML==3.12',
'secret-handshake'
]

36
ssb/tests/test_util.py Normal file
View File

@ -0,0 +1,36 @@
from base64 import b64decode
from unittest.mock import mock_open, patch
import pytest
from ssb.util import load_ssb_secret, ConfigException
CONFIG_FILE = """
## Comments should be supported too
{
"curve": "ed25519",
"public": "rsYpBIcXsxjQAf0JNes+MHqT2DL+EfopWKAp4rGeEPQ=ed25519",
"private": "/bqDBI/vGLD5qy3GxMsgHFgYIrrY08JfTzUaCYT6x0GuxikEhxezGNAB/Qk16z4wepPYMv4R+ilYoCnisZ4Q9A==",
"id": "@rsYpBIcXsxjQAf0JNes+MHqT2DL+EfopWKAp4rGeEPQ=.ed25519"
}
"""
CONFIG_FILE_INVALID = CONFIG_FILE.replace('ed25519', 'foo')
def test_load_secret():
with patch('ssb.util.open', mock_open(read_data=CONFIG_FILE), create=True):
secret = load_ssb_secret()
priv_key = b'\xfd\xba\x83\x04\x8f\xef\x18\xb0\xf9\xab-\xc6\xc4\xcb \x1cX\x18"\xba\xd8\xd3\xc2_O5\x1a\t\x84\xfa\xc7A'
assert secret['id'] == '@rsYpBIcXsxjQAf0JNes+MHqT2DL+EfopWKAp4rGeEPQ=.ed25519'
assert bytes(secret['keypair']) == priv_key
assert bytes(secret['keypair'].verify_key) == b64decode('rsYpBIcXsxjQAf0JNes+MHqT2DL+EfopWKAp4rGeEPQ=')
def test_load_exception():
with pytest.raises(ConfigException):
with patch('ssb.util.open', mock_open(read_data=CONFIG_FILE_INVALID), create=True):
load_ssb_secret()

View File

@ -1,53 +1,24 @@
import struct
import os
import yaml
from base64 import b64decode
# Stolen from PyCypto (Public Domain)
from nacl.signing import SigningKey
def b(s):
return s.encode("latin-1") # utf-8 would cause some side-effects we don't want
class ConfigException(Exception):
pass
def long_to_bytes(n, blocksize=0):
"""long_to_bytes(n:long, blocksize:int) : string
Convert a long integer to a byte string.
If optional blocksize is given and greater than zero, pad the front of the
byte string with binary zeros so that the length is a multiple of
blocksize.
"""
# after much testing, this algorithm was deemed to be the fastest
s = b('')
pack = struct.pack
while n > 0:
s = pack('>I', n & 0xffffffff) + s
n = n >> 32
# strip off leading zeros
for i in range(len(s)):
if s[i] != b('\000')[0]:
break
else:
# only happens when n == 0
s = b('\000')
i = 0
s = s[i:]
# add back some pad bytes. this could be done more efficiently w.r.t. the
# de-padding being done above, but sigh...
if blocksize > 0 and len(s) % blocksize:
s = (blocksize - len(s) % blocksize) * b('\000') + s
return s
def load_ssb_secret():
"""Load SSB keys from ~/.ssb"""
with open(os.path.expanduser('~/.ssb/secret')) as f:
config = yaml.load(f)
if config['curve'] != 'ed25519':
raise ConfigException('Algorithm not known: ' + config['curve'])
def bytes_to_long(s):
"""bytes_to_long(string) : long
Convert a byte string to a long integer.
This is (essentially) the inverse of long_to_bytes().
"""
acc = 0
unpack = struct.unpack
length = len(s)
if length % 4:
extra = (4 - length % 4)
s = b('\000') * extra + s
length = length + extra
for i in range(0, length, 4):
acc = (acc << 32) + unpack('>I', s[i:i+4])[0]
return acc
server_prv_key = b64decode(config['private'][:-8])
return {
'keypair': SigningKey(server_prv_key[:32]),
'id': config['id']
}