First functional version

This commit is contained in:
Pedro Ferreira 2017-06-04 21:50:49 +02:00
commit 66a4149f12
18 changed files with 855 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
*.egg-info
.cache
.eggs
.coverage
__pycache__
*.pyc
node_modules

1
AUTHORS Normal file
View File

@ -0,0 +1 @@
Main author: Pedro Ferreira <pedro@dete.st>

0
CHANGES.rst Normal file
View File

19
LICENSE Normal file
View File

@ -0,0 +1,19 @@
Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

8
MANIFEST.in Normal file
View File

@ -0,0 +1,8 @@
include *.rst
include AUTHORS
include LICENSE
include README.rst
include pytest.ini
exclude examples
recursive-exclude examples *.py

9
README.rst Normal file
View File

@ -0,0 +1,9 @@
**WORK IN PROGRESS**
PySecretHandshake - Secret Handshake in Python
==============================================
This module implements Secret Handshake as specified in Dominc Tarr's paper `"Designing a Secret Handshake: Authenticated
Key Exchange as a Capability System" <http://dominictarr.github.io/secret-handshake-paper/shs.pdf>`_ (Dominic Tarr, 2015).
**Please, don't use this package in production. The implementation hasn't yet been carefully checked.**

28
examples/test_client.py Normal file
View File

@ -0,0 +1,28 @@
import os
import yaml
from asyncio import get_event_loop
from base64 import b64decode
from nacl.signing import SigningKey
from secret_handshake import SHSClient
with open(os.path.expanduser('~/.ssb/secret')) as f:
config = yaml.load(f)
async def main():
async for msg in client:
print(msg)
loop = get_event_loop()
server_pub_key = b64decode(config['public'][:-8])
client = SHSClient('localhost', 8008, SigningKey.generate(), server_pub_key, loop=loop)
client.connect()
loop.run_until_complete(main())
loop.close()

29
examples/test_server.py Normal file
View File

@ -0,0 +1,29 @@
import os
import yaml
from asyncio import get_event_loop
from base64 import b64decode
from nacl.signing import SigningKey
from secret_handshake import SHSServer
with open(os.path.expanduser('~/.ssb/secret')) as f:
config = yaml.load(f)
async def main():
async for msg in server:
print(msg)
loop = get_event_loop()
client_keypair = SigningKey(b64decode(config['private'][:-8])[:32])
server = SHSServer('localhost', 8008, client_keypair, loop=loop)
server.on_connect(main)
server.listen()
loop.run_forever()
loop.close()

3
pytest.ini Normal file
View File

@ -0,0 +1,3 @@
[pytest]
addopts = --cov secret_handshake --cov-report term-missing --no-cov-on-fail
python_files = secret_handshake/test_*.py

View File

@ -0,0 +1,24 @@
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from .network import SHSClient, SHSServer
__all__ = ('SHSClient', 'SHSServer')

View File

@ -0,0 +1,101 @@
import struct
from nacl.secret import SecretBox
from .util import bytes_to_long, long_to_bytes
NONCE_SIZE = 24
HEADER_LENGTH = 2 + 16 + 16
MAX_SEGMENT_SIZE = 4 * 1024
TERMINATION_HEADER = (b'\x00' * 18)
MAX_NONCE = (8 * NONCE_SIZE)
# TODO: Implement handling of messages > 4k
def inc_nonce(nonce):
num = bytes_to_long(nonce) + 1
if num > 2 ** MAX_NONCE:
num = 0
bnum = long_to_bytes(num)
bnum = b'\x00' * (NONCE_SIZE - len(bnum)) + bnum
return bnum
def get_stream_pair(reader, writer, **kwargs):
"""Return a tuple with `(unbox_stream, box_stream)` (reader/writer).
:return: (:class:`secret_handshake.boxstream.UnboxStream`,
:class:`secret_handshake.boxstream.BoxStream`) """
box_args = {
'key': kwargs['encrypt_key'],
'nonce': kwargs['encrypt_nonce'],
}
unbox_args = {
'key': kwargs['decrypt_key'],
'nonce': kwargs['decrypt_nonce'],
}
return UnboxStream(reader, **unbox_args), BoxStream(writer, **box_args)
class UnboxStream(object):
def __init__(self, reader, key, nonce):
self.reader = reader
self.key = key
self.nonce = nonce
self.closed = False
async def read(self):
data = await self.reader.read(HEADER_LENGTH)
if not data:
self.closed = True
return None
box = SecretBox(self.key)
header = box.decrypt(data, self.nonce)
if header == TERMINATION_HEADER:
self.closed = True
return None
length = struct.unpack('>H', header[:2])[0]
mac = header[2:]
data = await self.reader.read(length)
body = box.decrypt(mac + data, inc_nonce(self.nonce))
self.nonce = inc_nonce(inc_nonce(self.nonce))
return body
async def __aiter__(self):
while True:
data = await self.read()
if data is None:
return
yield data
class BoxStream(object):
def __init__(self, writer, key, nonce):
self.writer = writer
self.key = key
self.box = SecretBox(self.key)
self.nonce = nonce
def write(self, data):
# XXX: This nonce logic is almost for sure wrong
body = self.box.encrypt(data, inc_nonce(self.nonce))[24:]
header = struct.pack('>H', len(body) - 16) + body[:16]
hdrbox = self.box.encrypt(header, self.nonce)[24:]
self.writer.write(hdrbox)
self.nonce = inc_nonce(inc_nonce(self.nonce))
self.writer.write(body[16:])
def close(self):
self.writer.write(self.box.encrypt(b'\x00' * 18, self.nonce)[24:])

181
secret_handshake/crypto.py Normal file
View File

@ -0,0 +1,181 @@
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import hashlib
import hmac
from base64 import b64decode
from nacl.bindings import crypto_scalarmult, crypto_box_afternm, crypto_box_open_afternm
from nacl.exceptions import CryptoError
from nacl.public import PrivateKey
from nacl.signing import VerifyKey
APPLICATION_KEY = b64decode('1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=')
class SHSError(Exception):
"""A SHS exception."""
pass
class SHSCryptoBase(object):
def __init__(self, local_key, ephemeral_key=None, application_key=None):
self.local_key = local_key
self.application_key = application_key or APPLICATION_KEY
self._reset_keys(ephemeral_key or PrivateKey.generate())
def _reset_keys(self, ephemeral_key):
self.local_ephemeral_key = ephemeral_key
self.local_app_hmac = (hmac.new(self.application_key, bytes(ephemeral_key.public_key), digestmod='sha512')
.digest()[:32])
def generate_challenge(self):
"""Generate and return a challenge to be sent to the server."""
return self.local_app_hmac + bytes(self.local_ephemeral_key.public_key)
def verify_challenge(self, data):
"""Verify the correctness of challenge sent from the client."""
assert len(data) == 64
sent_hmac, remote_ephemeral_key = data[:32], data[32:]
h = hmac.new(self.application_key, remote_ephemeral_key, digestmod='sha512')
self.remote_app_hmac = h.digest()[:32]
ok = self.remote_app_hmac == sent_hmac
if ok:
# this is (a * b)
self.shared_secret = crypto_scalarmult(bytes(self.local_ephemeral_key), remote_ephemeral_key)
self.remote_ephemeral_key = remote_ephemeral_key
# this is hash(a * b)
self.shared_hash = hashlib.sha256(self.shared_secret).digest()
return ok
def clean(self, new_ephemeral_key=None):
self._reset_keys(new_ephemeral_key or PrivateKey.generate())
self.shared_secret = None
self.shared_hash = None
self.remote_ephemeral_key = None
def get_box_keys(self):
shared_secret = hashlib.sha256(self.box_secret).digest()
return {
'shared_secret': shared_secret,
'encrypt_key': hashlib.sha256(shared_secret + bytes(self.remote_pub_key)).digest(),
'decrypt_key': hashlib.sha256(shared_secret + bytes(self.local_key.verify_key)).digest(),
'encrypt_nonce': self.remote_app_hmac[:24],
'decrypt_nonce': self.local_app_hmac[:24]
}
class SHSServerCrypto(SHSCryptoBase):
def verify_client_auth(self, data):
assert len(data) == 112
a_bob = crypto_scalarmult(bytes(self.local_key.to_curve25519_private_key()), self.remote_ephemeral_key)
box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob).digest()
self.hello = crypto_box_open_afternm(data, b'\x00' * 24, box_secret)
signature, public_key = self.hello[:64], self.hello[64:]
signed = self.application_key + bytes(self.local_key.verify_key) + self.shared_hash
pkey = VerifyKey(public_key)
# will raise an exception if verification fails
pkey.verify(signed, signature)
self.remote_pub_key = pkey
b_alice = crypto_scalarmult(bytes(self.local_ephemeral_key),
bytes(self.remote_pub_key.to_curve25519_public_key()))
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob + b_alice).digest()[:32]
return True
def generate_accept(self):
okay = self.local_key.sign(self.application_key + self.hello + self.shared_hash).signature
d = crypto_box_afternm(okay, b'\x00' * 24, self.box_secret)
return d
def clean(self, new_ephemeral_key=None):
super(SHSServerCrypto, self).clean(new_ephemeral_key=new_ephemeral_key)
self.hello = None
self.local_lterm_shared = None
class SHSClientCrypto(SHSCryptoBase):
"""An object that encapsulates all the SHS client-side crypto.
:param local_key: the keypair used by the client (:class:`nacl.public.PrivateKey` object)
:param server_pub_key: the server's public key (``byte`` string)
:param ephemeral_key: a fresh local :class:`nacl.public.PrivateKey`
:param application_key: the unique application key (``byte`` string), defaults to SSB's
"""
def __init__(self, local_key, server_pub_key, ephemeral_key, application_key=None):
super(SHSClientCrypto, self).__init__(local_key, ephemeral_key, application_key)
self.remote_pub_key = VerifyKey(server_pub_key)
def verify_server_challenge(self, data):
"""Verify the correctness of challenge sent from the server."""
# TODO: use super.verify_challenge and add extra logic
return super(SHSClientCrypto, self).verify_challenge(data)
def generate_client_auth(self):
"""Generate box[K|a*b|a*B](H)"""
curve_pkey = self.remote_pub_key.to_curve25519_public_key()
# remote_lterm_shared is (a * B)
remote_lterm_shared = crypto_scalarmult(bytes(self.local_ephemeral_key), bytes(curve_pkey))
self.remote_lterm_shared = remote_lterm_shared
# this shall be hash(K | a * b | a * B)
box_secret = hashlib.sha256(self.application_key + self.shared_secret + remote_lterm_shared).digest()
# and message_to_box will correspond to H = sign(A)[K | Bp | hash(a * b)] | Ap
signed_message = self.local_key.sign(self.application_key + bytes(self.remote_pub_key) + self.shared_hash)
message_to_box = signed_message.signature + bytes(self.local_key.verify_key)
self.client_auth = message_to_box
nonce = b"\x00" * 24
# return box(K | a * b | a * B)[H]
return crypto_box_afternm(message_to_box, nonce, box_secret)
def verify_server_accept(self, data):
"""Verify that the server's accept message is sane"""
curve_lkey = self.local_key.to_curve25519_private_key()
# local_lterm_shared is (A * b)
local_lterm_shared = crypto_scalarmult(bytes(curve_lkey), self.remote_ephemeral_key)
self.local_lterm_shared = local_lterm_shared
# this is hash(K | a * b | a * B | A * b)
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + self.remote_lterm_shared +
local_lterm_shared).digest()
nonce = b"\x00" * 24
try:
# let's use the box secret to unbox our encrypted message
signature = crypto_box_open_afternm(data, nonce, self.box_secret)
except CryptoError:
raise SHSError('Error decrypting server acceptance message')
# we should have received sign(B)[K | H | hash(a * b)]
# let's see if that signature can verify the reconstructed data on our side
self.remote_pub_key.verify(self.application_key + self.client_auth + self.shared_hash, signature)
return True
def clean(self, new_ephemeral_key=None):
super(SHSClientCrypto, self).clean(new_ephemeral_key=new_ephemeral_key)
self.remote_lterm_shared = None
self.local_lterm_shared = None

120
secret_handshake/network.py Normal file
View File

@ -0,0 +1,120 @@
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from asyncio import open_connection, start_server
from .boxstream import get_stream_pair
from .crypto import SHSClientCrypto, SHSServerCrypto
class SHSClientException(Exception):
pass
class SHSSocket(object):
def __init__(self, loop):
self.loop = loop
def write(self, data):
self.write_stream.write(data)
async def read(self):
return await self.read_stream.read()
async def disconnect(self):
self.writer.close()
async def __aiter__(self):
async for msg in self.read_stream:
yield msg
class SHSServer(SHSSocket):
def __init__(self, host, port, server_kp, application_key=None, loop=None):
super(SHSServer, self).__init__(loop)
self.host = host
self.port = port
self.crypto = SHSServerCrypto(server_kp, application_key=application_key)
self._on_connect = None
async def _handshake(self, reader, writer):
data = await reader.read(64)
if not self.crypto.verify_challenge(data):
raise SHSClientException('Client challenge is not valid')
writer.write(self.crypto.generate_challenge())
data = await reader.read(112)
if not self.crypto.verify_client_auth(data):
raise SHSClientException('Client auth is not valid')
writer.write(self.crypto.generate_accept())
async def handle_connection(self, reader, writer):
self.crypto.clean()
await self._handshake(reader, writer)
keys = self.crypto.get_box_keys()
self.crypto.clean()
self.read_stream, self.write_stream = get_stream_pair(reader, writer, **keys)
self.writer = writer
if self._on_connect:
await self._on_connect()
def listen(self):
self.loop.run_until_complete(start_server(self.handle_connection, self.host, self.port, loop=self.loop))
def on_connect(self, cb):
self._on_connect = cb
class SHSClient(SHSSocket):
def __init__(self, host, port, client_kp, server_pub_key, ephemeral_key=None, application_key=None, loop=None):
super(SHSClient, self).__init__(loop)
self.host = host
self.port = port
self.crypto = SHSClientCrypto(client_kp, server_pub_key, ephemeral_key=ephemeral_key,
application_key=application_key)
async def _handshake(self, reader, writer):
writer.write(self.crypto.generate_challenge())
data = await reader.read(64)
if not self.crypto.verify_server_challenge(data):
raise SHSClientException('Server challenge is not valid')
writer.write(self.crypto.generate_client_auth())
data = await reader.read(80)
if not self.crypto.verify_server_accept(data):
raise SHSClientException('Server accept is not valid')
def connect(self):
reader, writer = self.loop.run_until_complete(open_connection(self.host, self.port, loop=self.loop))
self.loop.run_until_complete(self._handshake(reader, writer))
keys = self.crypto.get_box_keys()
self.crypto.clean()
self.read_stream, self.write_stream = get_stream_pair(reader, writer, **keys)
self.writer = writer

View File

@ -0,0 +1,74 @@
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import pytest
from io import BytesIO
from .test_crypto import (CLIENT_ENCRYPT_KEY, CLIENT_ENCRYPT_NONCE)
from secret_handshake.boxstream import BoxStream, UnboxStream
MESSAGE_1 = (b'\xcev\xedE\x06l\x02\x13\xc8\x17V\xfa\x8bZ?\x88B%O\xb0L\x9f\x8e\x8c0y\x1dv\xc0\xc9\xf6\x9d\xc2\xdf\xdb'
b'\xee\x9d')
MESSAGE_2 = b"\x141\xd63\x13d\xd1\xecZ\x9b\xd0\xd4\x03\xcdR?'\xaa.\x89I\x92I\xf9guL\xaa\x06?\xea\xca/}\x88*\xb2"
MESSAGE_3 = (b'\xcbYY\xf1\x0f\xa5O\x13r\xa6"\x15\xc5\x9d\r.*\x0b\x92\x10m\xa6(\x0c\x0c\xc61\x80j\x81)\x800\xed\xda'
b'\xad\xa1')
MESSAGE_CLOSED = b'\xb1\x14hU\'\xb5M\xa6"\x03\x9duy\xa1\xd4evW,\xdcE\x18\xe4+ C4\xe8h\x96\xed\xc5\x94\x80'
class AsyncBuffer(BytesIO):
"""Just a BytesIO with an async read method."""
async def read(self, n=None):
return super(AsyncBuffer, self).read(n)
@pytest.mark.asyncio
async def test_boxstream():
buffer = AsyncBuffer()
box_stream = BoxStream(buffer, CLIENT_ENCRYPT_KEY, CLIENT_ENCRYPT_NONCE)
box_stream.write(b'foo')
buffer.seek(0)
assert await buffer.read() == MESSAGE_1
pos = buffer.tell()
box_stream.write(b'foo')
buffer.seek(pos)
assert await buffer.read() == MESSAGE_2
pos = buffer.tell()
box_stream.write(b'bar')
buffer.seek(pos)
assert await buffer.read() == MESSAGE_3
pos = buffer.tell()
box_stream.close()
buffer.seek(pos)
assert await buffer.read() == MESSAGE_CLOSED
@pytest.mark.asyncio
async def test_unboxstream():
buffer = AsyncBuffer(MESSAGE_1 + MESSAGE_2 + MESSAGE_3 + MESSAGE_CLOSED)
buffer.seek(0)
unbox_stream = UnboxStream(buffer, CLIENT_ENCRYPT_KEY, CLIENT_ENCRYPT_NONCE)
assert not unbox_stream.closed
assert [msg async for msg in unbox_stream] == [b'foo', b'foo', b'bar']
assert unbox_stream.closed

View File

@ -0,0 +1,111 @@
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import hashlib
import pytest
from nacl.public import PrivateKey
from nacl.signing import SigningKey
from secret_handshake.crypto import SHSClientCrypto, SHSServerCrypto
APP_KEY = hashlib.sha256(b'app_key').digest()
SERVER_KEY_SEED = b'\xcaw\x01\xc2cQ\xfd\x94\x9f\x14\x84\x0c0<l\xd8\xe4\xf5>\x12\\\x96\xcd\x9b\x0c\x02z&\x96!\xe0\xa2'
CLIENT_KEY_SEED = b'\xbf\x02<\xd3e\x9d\xac-\xd1\x9e-{\xe5q\x90\x03\x11\xba\x8cSQ\xa0\xc3p~\x89\xe6\xeeb\xaa\x1c\x17'
SERVER_EPH_KEY_SEED = b"ed\x1c\x01\x03s\x04\xdc\x8e`\xd6Z\xd0u;\xcbX\x91\xd8ZO\xf8\xf0\xd6'\xd5\xb1Yy\x13yH"
CLIENT_EPH_KEY_SEED = b'u8\xd0\xe3\x85d_Pz\x0c\xf5\xfd\x15\xce2p#\xb0\xf0\x9f\xe6!\xe1\xcb\xf6\x93\t\xebr{1\x8b'
@pytest.fixture()
def server():
server_key = SigningKey(SERVER_KEY_SEED)
server_eph_key = PrivateKey(SERVER_EPH_KEY_SEED)
return SHSServerCrypto(server_key, server_eph_key, application_key=APP_KEY)
@pytest.fixture()
def client():
client_key = SigningKey(CLIENT_KEY_SEED)
server_key = SigningKey(SERVER_KEY_SEED)
client_eph_key = PrivateKey(CLIENT_EPH_KEY_SEED)
return SHSClientCrypto(client_key, bytes(server_key.verify_key), client_eph_key, application_key=APP_KEY)
CLIENT_CHALLENGE = (b'd\xe8\xccD\xec\xb9E\xbb\xaa\xa7\x7f\xe38\x15\x16\xef\xca\xd22u\x1d\xfe<\xe7j'
b'\xd7\xf0uc\xf0r\xf3\x7f\t\x18\xec\x8c\xf7\xff\x8e\xa9\xc83\x13\x18R\x16\x1d'
b'\xe5\xc6K\xae\x94\xdbVt\x84\xdc\x1c@+D\x1c%')
CLIENT_AUTH = (b'\xf2\xaf?z\x15\x10\xd0\xf0\xdf\xe3\x91\xfe\x14\x1c}z\xab\xeey\xf5\xef\xfc\xa1EdV\xf2T\x95s[!$z'
b'\xeb\x8f\x1b\x96JP\x17^\x92\xc8\x9e\xb4*5`\xf2\x8fI.\x93\xb9\x14:\xca@\x06\xff\xd1\xf1J\xc8t\xc4'
b'\xd8\xc3$[\xc5\x94je\x83\x00%\x99\x10\x16\xb1\xa2\xb2\xb7\xbf\xc9\x88\x14\xb9\xbb^\tzq\xa4\xef\xc5'
b'\xf5\x1f7#\xed\x92X\xb2\xe3\xe5\x8b[t3')
SERVER_CHALLENGE = (b'S\\\x06\x8d\xe5\xeb&*\xb8\x0bp\xb3Z\x8e\\\x85\x14\xaa\x1c\x8di\x9d\x7f\xa9\xeawl\xb9}\x85\xc3ik'
b'\x0c ($E\xb4\x8ax\xc4)t<\xd7\x8b\xd6\x07\xb7\xecw\x84\r\xe1-Iz`\xeb\x04\x89\xd6{')
SERVER_ACCEPT = (b'\xb4\xd0\xea\xfb\xfb\xf6s\xcc\x10\xc4\x99\x95"\x13 y\xa6\xea.G\xeed\x8d=t9\x88|\x94\xd1\xbcK\xd47'
b'\xd8\xbcG1h\xac\xd0\xeb*\x1f\x8d\xae\x0b\x91G\xa1\xe6\x96b\xf2\xda90u\xeb_\xab\xdb\xcb%d7}\xb5\xce'
b'(k\x15\xe3L\x9d)\xd5\xa1|:')
INTER_SHARED_SECRET = (b'vf\xd82\xaeU\xda]\x08\x9eZ\xd6\x06\xcc\xd3\x99\xfd\xce\xc5\x16e8n\x9a\x04\x04\x84\xc5\x1a'
b'\x8f\xf2M')
BOX_SECRET = b'\x03\xfe\xe3\x8c u\xbcl^\x17eD\x96\xa3\xa6\x880f\x11\x7f\x85\xf2:\xa3[`\x06[#l\xbcr'
SHARED_SECRET = b'UV\xad*\x8e\xce\x88\xf2\x87l\x13iZ\x12\xd7\xa6\xd1\x9c-\x9d\x07\xf5\xa96\x03w\x11\xe5\x96$m\x1d'
CLIENT_ENCRYPT_KEY = (b'\xec\x1f,\x82\x9f\xedA\xc0\xda\x87[\xf9u\xbf\xac\x9cI\xa5T\xd1\x91\xff\xa8.\xd0 \xfbU\xc7\x14'
b')\xc7')
CLIENT_DECRYPT_KEY = b'\xf9e\xa0As\xb2=\xb7P~\xf3\xf9(\xfd\x7f\xfe\xb7TZhn\xd7\x8c=\xea.o\x9e\x8c9)\x10'
CLIENT_ENCRYPT_NONCE = b'S\\\x06\x8d\xe5\xeb&*\xb8\x0bp\xb3Z\x8e\\\x85\x14\xaa\x1c\x8di\x9d\x7f\xa9'
CLIENT_DECRYPT_NONCE = b'd\xe8\xccD\xec\xb9E\xbb\xaa\xa7\x7f\xe38\x15\x16\xef\xca\xd22u\x1d\xfe<\xe7'
def test_handshake(client, server):
client_challenge = client.generate_challenge()
assert client_challenge == CLIENT_CHALLENGE
assert server.verify_challenge(client_challenge)
server_challenge = server.generate_challenge()
assert server_challenge == SERVER_CHALLENGE
assert client.verify_server_challenge(server_challenge)
assert client.shared_secret == INTER_SHARED_SECRET
client_auth = client.generate_client_auth()
assert client_auth == CLIENT_AUTH
assert server.verify_client_auth(client_auth)
assert server.shared_secret == client.shared_secret
server_accept = server.generate_accept()
assert server_accept == SERVER_ACCEPT
assert client.verify_server_accept(server_accept)
assert client.box_secret == BOX_SECRET
assert client.box_secret == server.box_secret
client_keys = client.get_box_keys()
server_keys = server.get_box_keys()
assert client_keys['shared_secret'] == SHARED_SECRET
assert client_keys['encrypt_key'] == CLIENT_ENCRYPT_KEY
assert client_keys['decrypt_key'] == CLIENT_DECRYPT_KEY
assert client_keys['encrypt_nonce'] == CLIENT_ENCRYPT_NONCE
assert client_keys['decrypt_nonce'] == CLIENT_DECRYPT_NONCE
assert client_keys['shared_secret'] == server_keys['shared_secret']
assert client_keys['encrypt_key'] == server_keys['decrypt_key']
assert client_keys['encrypt_nonce'] == server_keys['decrypt_nonce']

53
secret_handshake/util.py Normal file
View File

@ -0,0 +1,53 @@
import struct
# Stolen from PyCypto (Public Domain)
def b(s):
return s.encode("latin-1") # utf-8 would cause some side-effects we don't want
def long_to_bytes(n, blocksize=0):
"""long_to_bytes(n:long, blocksize:int) : string
Convert a long integer to a byte string.
If optional blocksize is given and greater than zero, pad the front of the
byte string with binary zeros so that the length is a multiple of
blocksize.
"""
# after much testing, this algorithm was deemed to be the fastest
s = b('')
pack = struct.pack
while n > 0:
s = pack('>I', n & 0xffffffff) + s
n = n >> 32
# strip off leading zeros
for i in range(len(s)):
if s[i] != b('\000')[0]:
break
else:
# only happens when n == 0
s = b('\000')
i = 0
s = s[i:]
# add back some pad bytes. this could be done more efficiently w.r.t. the
# de-padding being done above, but sigh...
if blocksize > 0 and len(s) % blocksize:
s = (blocksize - len(s) % blocksize) * b('\000') + s
return s
def bytes_to_long(s):
"""bytes_to_long(string) : long
Convert a byte string to a long integer.
This is (essentially) the inverse of long_to_bytes().
"""
acc = 0
unpack = struct.unpack
length = len(s)
if length % 4:
extra = (4 - length % 4)
s = b('\000') * extra + s
length = length + extra
for i in range(0, length, 4):
acc = (acc << 32) + unpack('>I', s[i:i+4])[0]
return acc

5
setup.cfg Normal file
View File

@ -0,0 +1,5 @@
[flake8]
max-line-length=120
[aliases]
test=pytest

82
setup.py Normal file
View File

@ -0,0 +1,82 @@
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""A module that implements Secret Handshake as specified in "Designing a Secret Handshake: Authenticated
Key Exchange as a Capability System" (Dominic Tarr, 2015)."""
from setuptools import find_packages, setup
readme = open('README.rst').read()
history = open('CHANGES.rst').read()
tests_require = [
'check-manifest>=0.25',
'coverage>=4.0',
'isort>=4.2.2',
'pep257>=0.7.0',
'pytest-cov>=1.8.0',
'pytest>=3.1.1',
'pytest-asyncio==0.6.0'
]
extras_require = {
'docs': [
'Sphinx>=1.6.2',
],
'tests': tests_require,
}
extras_require['all'] = sum((lst for lst in extras_require.values()), [])
install_requires = [
'pynacl==1.1.2'
]
setup_requires = [
'pytest-runner'
]
packages = find_packages()
setup(
name='secret-handshake',
version='0.0.1',
description=__doc__,
long_description=(readme + '\n\n' + history),
license='MIT',
author='PySecretHandshake Contributors',
author_email='pedro@dete.st',
url='https://github.com/pferreir/PySecretHandshake',
packages=packages,
include_package_data=True,
extras_require=extras_require,
install_requires=install_requires,
setup_requires=setup_requires,
tests_require=tests_require,
zip_safe=False,
classifiers=[
'Intended Audience :: Developers',
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Topic :: Software Development :: Libraries :: Python Modules',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6'
],
)