PySecretHandshake/secret_handshake/crypto.py

259 lines
9.6 KiB
Python
Raw Normal View History

# SPDX-License-Identifier: MIT
#
2017-06-04 19:50:49 +00:00
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
2023-10-29 08:55:39 +00:00
"""Cryptography functions"""
2017-06-04 19:50:49 +00:00
2023-10-29 08:55:39 +00:00
from base64 import b64decode
2017-06-04 19:50:49 +00:00
import hashlib
import hmac
2023-10-29 08:55:39 +00:00
from typing import Optional
2017-06-04 19:50:49 +00:00
from nacl.bindings import crypto_box_afternm, crypto_box_open_afternm, crypto_scalarmult
2017-06-04 19:50:49 +00:00
from nacl.exceptions import CryptoError
from nacl.public import PrivateKey
from nacl.signing import SigningKey, VerifyKey
from .boxstream import BoxStreamKeys
2017-06-04 19:50:49 +00:00
APPLICATION_KEY = b64decode("1KHLiKZvAvjbY1ziZEHMXawbCEIM6qwjCDm3VYRan/s=")
2017-06-04 19:50:49 +00:00
class SHSError(Exception):
"""A SHS exception."""
2017-06-04 19:50:49 +00:00
2023-10-29 08:55:39 +00:00
class SHSCryptoBase: # pylint: disable=too-many-instance-attributes
"""Base functions for SHS cryptography"""
2017-06-04 19:50:49 +00:00
def __init__(
self,
local_key: SigningKey,
ephemeral_key: Optional[PrivateKey] = None,
application_key: Optional[bytes] = None,
):
2017-06-04 19:50:49 +00:00
self.application_key = application_key or APPLICATION_KEY
self.b_alice: Optional[bytes] = None
self.box_secret: Optional[bytes] = None
self.hello: Optional[bytes] = None
self.local_key = local_key
self.remote_app_hmac: Optional[bytes] = None
self.remote_ephemeral_key: Optional[bytes] = None
self.remote_pub_key: Optional[VerifyKey] = None
self.shared_hash: Optional[bytes] = None
self.shared_secret: Optional[bytes] = None
2017-06-04 19:50:49 +00:00
self._reset_keys(ephemeral_key or PrivateKey.generate())
def _reset_keys(self, ephemeral_key: PrivateKey) -> None:
2017-06-04 19:50:49 +00:00
self.local_ephemeral_key = ephemeral_key
self.local_app_hmac = hmac.new(
self.application_key, bytes(ephemeral_key.public_key), digestmod="sha512"
).digest()[:32]
2017-06-04 19:50:49 +00:00
def generate_challenge(self) -> bytes:
2017-06-04 19:50:49 +00:00
"""Generate and return a challenge to be sent to the server."""
return self.local_app_hmac + bytes(self.local_ephemeral_key.public_key)
def verify_challenge(self, data: bytes) -> bool:
2017-06-04 19:50:49 +00:00
"""Verify the correctness of challenge sent from the client."""
2017-06-04 19:50:49 +00:00
assert len(data) == 64
sent_hmac, remote_ephemeral_key = data[:32], data[32:]
h = hmac.new(self.application_key, remote_ephemeral_key, digestmod="sha512")
2017-06-04 19:50:49 +00:00
self.remote_app_hmac = h.digest()[:32]
ok = self.remote_app_hmac == sent_hmac
if ok:
# this is (a * b)
self.shared_secret = crypto_scalarmult(bytes(self.local_ephemeral_key), remote_ephemeral_key)
self.remote_ephemeral_key = remote_ephemeral_key
# this is hash(a * b)
self.shared_hash = hashlib.sha256(self.shared_secret).digest()
2017-06-04 19:50:49 +00:00
return ok
def clean(self, new_ephemeral_key: Optional[PrivateKey] = None) -> None:
2023-10-29 08:55:39 +00:00
"""Clean internal data"""
2017-06-04 19:50:49 +00:00
self._reset_keys(new_ephemeral_key or PrivateKey.generate())
self.shared_secret = None
self.shared_hash = None
self.remote_ephemeral_key = None
def get_box_keys(self) -> BoxStreamKeys:
2023-10-29 08:55:39 +00:00
"""Get the box streams keys"""
assert self.box_secret
assert self.remote_app_hmac
assert self.remote_pub_key
2017-06-04 19:50:49 +00:00
shared_secret = hashlib.sha256(self.box_secret).digest()
2017-06-04 19:50:49 +00:00
return {
"shared_secret": shared_secret,
"encrypt_key": hashlib.sha256(shared_secret + bytes(self.remote_pub_key)).digest(),
"decrypt_key": hashlib.sha256(shared_secret + bytes(self.local_key.verify_key)).digest(),
"encrypt_nonce": self.remote_app_hmac[:24],
"decrypt_nonce": self.local_app_hmac[:24],
2017-06-04 19:50:49 +00:00
}
class SHSServerCrypto(SHSCryptoBase):
2023-10-29 08:55:39 +00:00
"""SHS server crypto algorithm"""
def verify_client_auth(self, data: bytes) -> bool:
2023-10-29 08:55:39 +00:00
"""Verify client authentication data"""
assert self.remote_ephemeral_key
assert self.shared_hash
assert self.shared_secret
2017-06-04 19:50:49 +00:00
assert len(data) == 112
2017-06-04 19:50:49 +00:00
a_bob = crypto_scalarmult(bytes(self.local_key.to_curve25519_private_key()), self.remote_ephemeral_key)
box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob).digest()
self.hello = crypto_box_open_afternm(data, b"\x00" * 24, box_secret)
2017-06-04 19:50:49 +00:00
signature, public_key = self.hello[:64], self.hello[64:]
signed = self.application_key + bytes(self.local_key.verify_key) + self.shared_hash
pkey = VerifyKey(public_key)
# will raise an exception if verification fails
pkey.verify(signed, signature)
self.remote_pub_key = pkey
b_alice = crypto_scalarmult(
bytes(self.local_ephemeral_key), bytes(self.remote_pub_key.to_curve25519_public_key())
)
2017-06-04 19:50:49 +00:00
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob + b_alice).digest()[:32]
2017-06-04 19:50:49 +00:00
return True
def generate_accept(self) -> bytes:
2023-10-29 08:55:39 +00:00
"""Generate an accept message"""
assert self.box_secret
assert self.hello
assert self.shared_hash
2017-06-04 19:50:49 +00:00
okay = self.local_key.sign(self.application_key + self.hello + self.shared_hash).signature
2023-10-29 08:55:39 +00:00
return crypto_box_afternm(okay, b"\x00" * 24, self.box_secret)
2017-06-04 19:50:49 +00:00
def clean(self, new_ephemeral_key: Optional[PrivateKey] = None) -> None:
2023-10-29 08:55:39 +00:00
super().clean(new_ephemeral_key=new_ephemeral_key)
2017-06-04 19:50:49 +00:00
self.hello = None
self.b_alice = None
2017-06-04 19:50:49 +00:00
class SHSClientCrypto(SHSCryptoBase):
"""An object that encapsulates all the SHS client-side crypto.
2023-10-29 08:55:39 +00:00
:param local_key: the keypair used by the client
:param server_pub_key: the server's public key
:param ephemeral_key: a fresh local private key
:param application_key: the unique application key, defaults to SSB's
2017-06-04 19:50:49 +00:00
"""
2023-10-29 08:55:39 +00:00
def __init__(
self,
local_key: SigningKey,
2023-10-29 08:55:39 +00:00
server_pub_key: bytes,
ephemeral_key: PrivateKey,
application_key: Optional[bytes] = None,
):
super().__init__(local_key, ephemeral_key, application_key)
self.a_bob: Optional[bytes] = None
2017-06-04 19:50:49 +00:00
self.remote_pub_key = VerifyKey(server_pub_key)
def verify_server_challenge(self, data: bytes) -> bool:
2017-06-04 19:50:49 +00:00
"""Verify the correctness of challenge sent from the server."""
assert self.remote_pub_key
2023-10-29 08:55:39 +00:00
assert super().verify_challenge(data)
assert self.shared_hash
assert self.shared_secret
2017-06-04 19:50:49 +00:00
curve_pkey = self.remote_pub_key.to_curve25519_public_key()
# a_bob is (a * B)
a_bob = crypto_scalarmult(bytes(self.local_ephemeral_key), bytes(curve_pkey))
self.a_bob = a_bob
2017-06-04 19:50:49 +00:00
# this shall be hash(K | a * b | a * B)
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + a_bob).digest()
2017-06-04 19:50:49 +00:00
# and message_to_box will correspond to H = sign(A)[K | Bp | hash(a * b)] | Ap
signed_message = self.local_key.sign(self.application_key + bytes(self.remote_pub_key) + self.shared_hash)
message_to_box = signed_message.signature + bytes(self.local_key.verify_key)
self.hello = message_to_box
return True
def generate_client_auth(self) -> bytes:
"""Generate box[K|a*b|a*B](H)"""
2017-06-04 19:50:49 +00:00
assert self.box_secret
assert self.hello
2017-06-04 19:50:49 +00:00
nonce = b"\x00" * 24
2017-06-04 19:50:49 +00:00
# return box(K | a * b | a * B)[H]
return crypto_box_afternm(self.hello, nonce, self.box_secret)
2017-06-04 19:50:49 +00:00
def verify_server_accept(self, data: bytes) -> bool:
2017-06-04 19:50:49 +00:00
"""Verify that the server's accept message is sane"""
assert self.a_bob
assert self.hello
assert self.remote_ephemeral_key
assert self.remote_pub_key
assert self.shared_hash
assert self.shared_secret
2017-06-04 19:50:49 +00:00
curve_lkey = self.local_key.to_curve25519_private_key()
# b_alice is (A * b)
b_alice = crypto_scalarmult(bytes(curve_lkey), self.remote_ephemeral_key)
self.b_alice = b_alice
2017-06-04 19:50:49 +00:00
# this is hash(K | a * b | a * B | A * b)
self.box_secret = hashlib.sha256(self.application_key + self.shared_secret + self.a_bob + b_alice).digest()
2017-06-04 19:50:49 +00:00
nonce = b"\x00" * 24
try:
# let's use the box secret to unbox our encrypted message
signature = crypto_box_open_afternm(data, nonce, self.box_secret)
2023-10-29 08:55:39 +00:00
except CryptoError as exc:
raise SHSError("Error decrypting server acceptance message") from exc
2017-06-04 19:50:49 +00:00
# we should have received sign(B)[K | H | hash(a * b)]
# let's see if that signature can verify the reconstructed data on our side
self.remote_pub_key.verify(self.application_key + self.hello + self.shared_hash, signature)
2017-06-04 19:50:49 +00:00
return True
def clean(self, new_ephemeral_key: Optional[PrivateKey] = None) -> None:
2023-10-29 08:55:39 +00:00
super().clean(new_ephemeral_key=new_ephemeral_key)
self.a_bob = None
self.b_alice = None