2017-06-04 19:50:49 +00:00
|
|
|
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
|
|
|
|
#
|
|
|
|
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
|
|
# of this software and associated documentation files (the "Software"), to deal
|
|
|
|
# in the Software without restriction, including without limitation the rights
|
|
|
|
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
|
|
# copies of the Software, and to permit persons to whom the Software is
|
|
|
|
# furnished to do so, subject to the following conditions:
|
|
|
|
#
|
|
|
|
# The above copyright notice and this permission notice shall be included in all
|
|
|
|
# copies or substantial portions of the Software.
|
|
|
|
#
|
|
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
|
|
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
|
|
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
|
|
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
|
|
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
|
|
# SOFTWARE.
|
|
|
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
import hmac
|
|
|
|
from base64 import b64decode
|
|
|
|
|
|
|
|
from nacl.bindings import crypto_scalarmult, crypto_box_afternm, crypto_box_open_afternm
|
|
|
|
from nacl.exceptions import CryptoError
|
|
|
|
from nacl.public import PrivateKey
|
|
|
|
from nacl.signing import VerifyKey
|
|
|
|
|
|
|
|
APPLICATION_KEY = b64decode('1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=')
|
|
|
|
|
|
|
|
|
|
|
|
class SHSError(Exception):
|
|
|
|
"""A SHS exception."""
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class SHSCryptoBase(object):
|
|
|
|
def __init__(self, local_key, ephemeral_key=None, application_key=None):
|
|
|
|
self.local_key = local_key
|
|
|
|
self.application_key = application_key or APPLICATION_KEY
|
|
|
|
self._reset_keys(ephemeral_key or PrivateKey.generate())
|
|
|
|
|
|
|
|
def _reset_keys(self, ephemeral_key):
|
|
|
|
self.local_ephemeral_key = ephemeral_key
|
|
|
|
self.local_app_hmac = (hmac.new(self.application_key, bytes(ephemeral_key.public_key), digestmod='sha512')
|
|
|
|
.digest()[:32])
|
|
|
|
|
|
|
|
def generate_challenge(self):
|
|
|
|
"""Generate and return a challenge to be sent to the server."""
|
|
|
|
return self.local_app_hmac + bytes(self.local_ephemeral_key.public_key)
|
|
|
|
|
|
|
|
def verify_challenge(self, data):
|
|
|
|
"""Verify the correctness of challenge sent from the client."""
|
|
|
|
assert len(data) == 64
|
|
|
|
sent_hmac, remote_ephemeral_key = data[:32], data[32:]
|
|
|
|
|
|
|
|
h = hmac.new(self.application_key, remote_ephemeral_key, digestmod='sha512')
|
|
|
|
self.remote_app_hmac = h.digest()[:32]
|
|
|
|
ok = self.remote_app_hmac == sent_hmac
|
|
|
|
|
|
|
|
if ok:
|
|
|
|
# this is (a * b)
|
|
|
|
self.shared_secret = crypto_scalarmult(bytes(self.local_ephemeral_key), remote_ephemeral_key)
|
|
|
|
self.remote_ephemeral_key = remote_ephemeral_key
|
|
|
|
# this is hash(a * b)
|
|
|
|
self.shared_hash = hashlib.sha256(self.shared_secret).digest()
|
|
|
|
return ok
|
|
|
|
|
|
|
|
def clean(self, new_ephemeral_key=None):
|
|
|
|
self._reset_keys(new_ephemeral_key or PrivateKey.generate())
|
|
|
|
self.shared_secret = None
|
|
|
|
self.shared_hash = None
|
|
|
|
self.remote_ephemeral_key = None
|
|
|
|
|
|
|
|
def get_box_keys(self):
|
|
|
|
shared_secret = hashlib.sha256(self.box_secret).digest()
|
|
|
|
return {
|
|
|
|
'shared_secret': shared_secret,
|
|
|
|
'encrypt_key': hashlib.sha256(shared_secret + bytes(self.remote_pub_key)).digest(),
|
|
|
|
'decrypt_key': hashlib.sha256(shared_secret + bytes(self.local_key.verify_key)).digest(),
|
|
|
|
'encrypt_nonce': self.remote_app_hmac[:24],
|
|
|
|
'decrypt_nonce': self.local_app_hmac[:24]
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class SHSServerCrypto(SHSCryptoBase):
|
|
|
|
def verify_client_auth(self, data):
|
|
|
|
assert len(data) == 112
|
|
|
|
a_bob = crypto_scalarmult(bytes(self.local_key.to_curve25519_private_key()), self.remote_ephemeral_key)
|
|
|
|
box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob).digest()
|
|
|
|
self.hello = crypto_box_open_afternm(data, b'\x00' * 24, box_secret)
|
|
|
|
signature, public_key = self.hello[:64], self.hello[64:]
|
|
|
|
signed = self.application_key + bytes(self.local_key.verify_key) + self.shared_hash
|
|
|
|
pkey = VerifyKey(public_key)
|
|
|
|
|
|
|
|
# will raise an exception if verification fails
|
|
|
|
pkey.verify(signed, signature)
|
|
|
|
self.remote_pub_key = pkey
|
|
|
|
b_alice = crypto_scalarmult(bytes(self.local_ephemeral_key),
|
|
|
|
bytes(self.remote_pub_key.to_curve25519_public_key()))
|
|
|
|
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob + b_alice).digest()[:32]
|
|
|
|
return True
|
|
|
|
|
|
|
|
def generate_accept(self):
|
|
|
|
okay = self.local_key.sign(self.application_key + self.hello + self.shared_hash).signature
|
|
|
|
d = crypto_box_afternm(okay, b'\x00' * 24, self.box_secret)
|
|
|
|
return d
|
|
|
|
|
|
|
|
def clean(self, new_ephemeral_key=None):
|
|
|
|
super(SHSServerCrypto, self).clean(new_ephemeral_key=new_ephemeral_key)
|
|
|
|
self.hello = None
|
2017-06-05 09:45:22 +00:00
|
|
|
self.b_alice = None
|
2017-06-04 19:50:49 +00:00
|
|
|
|
|
|
|
|
|
|
|
class SHSClientCrypto(SHSCryptoBase):
|
|
|
|
"""An object that encapsulates all the SHS client-side crypto.
|
|
|
|
|
|
|
|
:param local_key: the keypair used by the client (:class:`nacl.public.PrivateKey` object)
|
|
|
|
:param server_pub_key: the server's public key (``byte`` string)
|
|
|
|
:param ephemeral_key: a fresh local :class:`nacl.public.PrivateKey`
|
|
|
|
:param application_key: the unique application key (``byte`` string), defaults to SSB's
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, local_key, server_pub_key, ephemeral_key, application_key=None):
|
|
|
|
super(SHSClientCrypto, self).__init__(local_key, ephemeral_key, application_key)
|
|
|
|
self.remote_pub_key = VerifyKey(server_pub_key)
|
|
|
|
|
|
|
|
def verify_server_challenge(self, data):
|
|
|
|
"""Verify the correctness of challenge sent from the server."""
|
2017-06-05 09:45:22 +00:00
|
|
|
assert super(SHSClientCrypto, self).verify_challenge(data)
|
2017-06-04 19:50:49 +00:00
|
|
|
curve_pkey = self.remote_pub_key.to_curve25519_public_key()
|
|
|
|
|
2017-06-05 09:45:22 +00:00
|
|
|
# a_bob is (a * B)
|
|
|
|
a_bob = crypto_scalarmult(bytes(self.local_ephemeral_key), bytes(curve_pkey))
|
|
|
|
self.a_bob = a_bob
|
2017-06-04 19:50:49 +00:00
|
|
|
# this shall be hash(K | a * b | a * B)
|
2017-06-05 09:45:22 +00:00
|
|
|
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob).digest()
|
2017-06-04 19:50:49 +00:00
|
|
|
|
|
|
|
# and message_to_box will correspond to H = sign(A)[K | Bp | hash(a * b)] | Ap
|
|
|
|
signed_message = self.local_key.sign(self.application_key + bytes(self.remote_pub_key) + self.shared_hash)
|
|
|
|
message_to_box = signed_message.signature + bytes(self.local_key.verify_key)
|
2017-06-05 09:45:22 +00:00
|
|
|
self.hello = message_to_box
|
|
|
|
return True
|
|
|
|
|
|
|
|
def generate_client_auth(self):
|
|
|
|
"""Generate box[K|a*b|a*B](H)"""
|
2017-06-04 19:50:49 +00:00
|
|
|
|
|
|
|
nonce = b"\x00" * 24
|
|
|
|
# return box(K | a * b | a * B)[H]
|
2017-06-05 09:45:22 +00:00
|
|
|
return crypto_box_afternm(self.hello, nonce, self.box_secret)
|
2017-06-04 19:50:49 +00:00
|
|
|
|
|
|
|
def verify_server_accept(self, data):
|
|
|
|
"""Verify that the server's accept message is sane"""
|
|
|
|
curve_lkey = self.local_key.to_curve25519_private_key()
|
2017-06-05 09:45:22 +00:00
|
|
|
# b_alice is (A * b)
|
|
|
|
b_alice = crypto_scalarmult(bytes(curve_lkey), self.remote_ephemeral_key)
|
|
|
|
self.b_alice = b_alice
|
2017-06-04 19:50:49 +00:00
|
|
|
# this is hash(K | a * b | a * B | A * b)
|
2017-06-05 09:45:22 +00:00
|
|
|
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + self.a_bob +
|
|
|
|
b_alice).digest()
|
2017-06-04 19:50:49 +00:00
|
|
|
|
|
|
|
nonce = b"\x00" * 24
|
|
|
|
|
|
|
|
try:
|
|
|
|
# let's use the box secret to unbox our encrypted message
|
|
|
|
signature = crypto_box_open_afternm(data, nonce, self.box_secret)
|
|
|
|
except CryptoError:
|
|
|
|
raise SHSError('Error decrypting server acceptance message')
|
|
|
|
|
|
|
|
# we should have received sign(B)[K | H | hash(a * b)]
|
|
|
|
# let's see if that signature can verify the reconstructed data on our side
|
2017-06-05 09:45:22 +00:00
|
|
|
self.remote_pub_key.verify(self.application_key + self.hello + self.shared_hash, signature)
|
2017-06-04 19:50:49 +00:00
|
|
|
return True
|
|
|
|
|
|
|
|
def clean(self, new_ephemeral_key=None):
|
|
|
|
super(SHSClientCrypto, self).clean(new_ephemeral_key=new_ephemeral_key)
|
2017-06-05 09:45:22 +00:00
|
|
|
self.a_bob = None
|
|
|
|
self.b_alice = None
|