ci: Lint source
This commit is contained in:
@@ -26,8 +26,9 @@ from io import BytesIO
|
||||
class AsyncBuffer(BytesIO):
|
||||
"""Just a BytesIO with an async read method."""
|
||||
|
||||
async def read(self, n=None):
|
||||
v = super(AsyncBuffer, self).read(n)
|
||||
async def read(self, n=None): # pylint: disable=invalid-overridden-method
|
||||
v = super().read(n)
|
||||
|
||||
return v
|
||||
|
||||
readexactly = read
|
||||
|
@@ -18,6 +18,7 @@
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
"""Tests for the box stream"""
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -27,17 +28,18 @@ from .helpers import AsyncBuffer, async_comprehend
|
||||
from .test_crypto import CLIENT_ENCRYPT_KEY, CLIENT_ENCRYPT_NONCE
|
||||
|
||||
MESSAGE_1 = (
|
||||
b"\xcev\xedE\x06l\x02\x13\xc8\x17V\xfa\x8bZ?\x88B%O\xb0L\x9f\x8e\x8c0y\x1dv\xc0\xc9\xf6\x9d\xc2\xdf\xdb" b"\xee\x9d"
|
||||
b"\xcev\xedE\x06l\x02\x13\xc8\x17V\xfa\x8bZ?\x88B%O\xb0L\x9f\x8e\x8c0y\x1dv\xc0\xc9\xf6\x9d\xc2\xdf\xdb\xee\x9d"
|
||||
)
|
||||
MESSAGE_2 = b"\x141\xd63\x13d\xd1\xecZ\x9b\xd0\xd4\x03\xcdR?'\xaa.\x89I\x92I\xf9guL\xaa\x06?\xea\xca/}\x88*\xb2"
|
||||
MESSAGE_3 = (
|
||||
b'\xcbYY\xf1\x0f\xa5O\x13r\xa6"\x15\xc5\x9d\r.*\x0b\x92\x10m\xa6(\x0c\x0c\xc61\x80j\x81)\x800\xed\xda' b"\xad\xa1"
|
||||
b'\xcbYY\xf1\x0f\xa5O\x13r\xa6"\x15\xc5\x9d\r.*\x0b\x92\x10m\xa6(\x0c\x0c\xc61\x80j\x81)\x800\xed\xda\xad\xa1'
|
||||
)
|
||||
MESSAGE_CLOSED = b"\xb1\x14hU'\xb5M\xa6\"\x03\x9duy\xa1\xd4evW,\xdcE\x18\xe4+ C4\xe8h\x96\xed\xc5\x94\x80"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_boxstream():
|
||||
"""Test stream boxing"""
|
||||
buffer = AsyncBuffer()
|
||||
box_stream = BoxStream(buffer, CLIENT_ENCRYPT_KEY, CLIENT_ENCRYPT_NONCE)
|
||||
box_stream.write(b"foo")
|
||||
@@ -62,6 +64,8 @@ async def test_boxstream():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_unboxstream():
|
||||
"""Test stream unboxing"""
|
||||
|
||||
buffer = AsyncBuffer(MESSAGE_1 + MESSAGE_2 + MESSAGE_3 + MESSAGE_CLOSED)
|
||||
buffer.seek(0)
|
||||
|
||||
@@ -73,6 +77,8 @@ async def test_unboxstream():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_long_packets():
|
||||
"""Test for receiving long packets"""
|
||||
|
||||
data_size = 6 * 1024
|
||||
data = bytes(n % 256 for n in range(data_size))
|
||||
|
||||
|
@@ -18,6 +18,7 @@
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
"""Tests for the crypto components"""
|
||||
|
||||
import hashlib
|
||||
|
||||
@@ -34,18 +35,24 @@ SERVER_EPH_KEY_SEED = b"ed\x1c\x01\x03s\x04\xdc\x8e`\xd6Z\xd0u;\xcbX\x91\xd8ZO\x
|
||||
CLIENT_EPH_KEY_SEED = b"u8\xd0\xe3\x85d_Pz\x0c\xf5\xfd\x15\xce2p#\xb0\xf0\x9f\xe6!\xe1\xcb\xf6\x93\t\xebr{1\x8b"
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@pytest.fixture
|
||||
def server():
|
||||
"""A testing SHS server"""
|
||||
|
||||
server_key = SigningKey(SERVER_KEY_SEED)
|
||||
server_eph_key = PrivateKey(SERVER_EPH_KEY_SEED)
|
||||
|
||||
return SHSServerCrypto(server_key, server_eph_key, application_key=APP_KEY)
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
@pytest.fixture
|
||||
def client():
|
||||
"""A testing SHS client"""
|
||||
|
||||
client_key = SigningKey(CLIENT_KEY_SEED)
|
||||
server_key = SigningKey(SERVER_KEY_SEED)
|
||||
client_eph_key = PrivateKey(CLIENT_EPH_KEY_SEED)
|
||||
|
||||
return SHSClientCrypto(client_key, bytes(server_key.verify_key), client_eph_key, application_key=APP_KEY)
|
||||
|
||||
|
||||
@@ -55,19 +62,19 @@ CLIENT_CHALLENGE = (
|
||||
b"\xe5\xc6K\xae\x94\xdbVt\x84\xdc\x1c@+D\x1c%"
|
||||
)
|
||||
CLIENT_AUTH = (
|
||||
b"\xf2\xaf?z\x15\x10\xd0\xf0\xdf\xe3\x91\xfe\x14\x1c}z\xab\xeey\xf5\xef\xfc\xa1EdV\xf2T\x95s[!$z"
|
||||
b"\xeb\x8f\x1b\x96JP\x17^\x92\xc8\x9e\xb4*5`\xf2\x8fI.\x93\xb9\x14:\xca@\x06\xff\xd1\xf1J\xc8t\xc4"
|
||||
b"\xd8\xc3$[\xc5\x94je\x83\x00%\x99\x10\x16\xb1\xa2\xb2\xb7\xbf\xc9\x88\x14\xb9\xbb^\tzq\xa4\xef\xc5"
|
||||
b"\xf5\x1f7#\xed\x92X\xb2\xe3\xe5\x8b[t3"
|
||||
b"\xf2\xaf?z\x15\x10\xd0\xf0\xdf\xe3\x91\xfe\x14\x1c}z\xab\xeey\xf5\xef\xfc\xa1EdV\xf2T\x95s[!$"
|
||||
b"z\xeb\x8f\x1b\x96JP\x17^\x92\xc8\x9e\xb4*5`\xf2\x8fI.\x93\xb9\x14:\xca@\x06\xff\xd1\xf1J\xc8t"
|
||||
b"\xc4\xd8\xc3$[\xc5\x94je\x83\x00%\x99\x10\x16\xb1\xa2\xb2\xb7\xbf\xc9\x88\x14\xb9\xbb^\tzq"
|
||||
b"\xa4\xef\xc5\xf5\x1f7#\xed\x92X\xb2\xe3\xe5\x8b[t3"
|
||||
)
|
||||
SERVER_CHALLENGE = (
|
||||
b"S\\\x06\x8d\xe5\xeb&*\xb8\x0bp\xb3Z\x8e\\\x85\x14\xaa\x1c\x8di\x9d\x7f\xa9\xeawl\xb9}\x85\xc3ik"
|
||||
b"\x0c ($E\xb4\x8ax\xc4)t<\xd7\x8b\xd6\x07\xb7\xecw\x84\r\xe1-Iz`\xeb\x04\x89\xd6{"
|
||||
b"S\\\x06\x8d\xe5\xeb&*\xb8\x0bp\xb3Z\x8e\\\x85\x14\xaa\x1c\x8di\x9d\x7f\xa9\xeawl\xb9}\x85\xc3"
|
||||
b"ik\x0c ($E\xb4\x8ax\xc4)t<\xd7\x8b\xd6\x07\xb7\xecw\x84\r\xe1-Iz`\xeb\x04\x89\xd6{"
|
||||
)
|
||||
SERVER_ACCEPT = (
|
||||
b'\xb4\xd0\xea\xfb\xfb\xf6s\xcc\x10\xc4\x99\x95"\x13 y\xa6\xea.G\xeed\x8d=t9\x88|\x94\xd1\xbcK\xd47'
|
||||
b"\xd8\xbcG1h\xac\xd0\xeb*\x1f\x8d\xae\x0b\x91G\xa1\xe6\x96b\xf2\xda90u\xeb_\xab\xdb\xcb%d7}\xb5\xce"
|
||||
b"(k\x15\xe3L\x9d)\xd5\xa1|:"
|
||||
b'\xb4\xd0\xea\xfb\xfb\xf6s\xcc\x10\xc4\x99\x95"\x13 y\xa6\xea.G\xeed\x8d=t9\x88|\x94\xd1\xbcK'
|
||||
b"\xd47\xd8\xbcG1h\xac\xd0\xeb*\x1f\x8d\xae\x0b\x91G\xa1\xe6\x96b\xf2\xda90u\xeb_\xab\xdb\xcb%d"
|
||||
b"7}\xb5\xce(k\x15\xe3L\x9d)\xd5\xa1|:"
|
||||
)
|
||||
INTER_SHARED_SECRET = (
|
||||
b"vf\xd82\xaeU\xda]\x08\x9eZ\xd6\x06\xcc\xd3\x99\xfd\xce\xc5\x16e8n\x9a\x04\x04\x84\xc5\x1a" b"\x8f\xf2M"
|
||||
@@ -83,7 +90,9 @@ CLIENT_ENCRYPT_NONCE = b"S\\\x06\x8d\xe5\xeb&*\xb8\x0bp\xb3Z\x8e\\\x85\x14\xaa\x
|
||||
CLIENT_DECRYPT_NONCE = b"d\xe8\xccD\xec\xb9E\xbb\xaa\xa7\x7f\xe38\x15\x16\xef\xca\xd22u\x1d\xfe<\xe7"
|
||||
|
||||
|
||||
def test_handshake(client, server):
|
||||
def test_handshake(client, server): # pylint: disable=redefined-outer-name
|
||||
"""Test the handshake procedure"""
|
||||
|
||||
client_challenge = client.generate_challenge()
|
||||
assert client_challenge == CLIENT_CHALLENGE
|
||||
assert server.verify_challenge(client_challenge)
|
||||
|
@@ -18,40 +18,59 @@
|
||||
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
# SOFTWARE.
|
||||
|
||||
"""Tests for the networking components"""
|
||||
|
||||
import os
|
||||
from asyncio import Event, wait_for
|
||||
|
||||
import pytest
|
||||
from nacl.signing import SigningKey
|
||||
|
||||
from secret_handshake import SHSClient, SHSServer
|
||||
|
||||
from .helpers import AsyncBuffer
|
||||
|
||||
|
||||
class DummyCrypto(object):
|
||||
class DummyCrypto:
|
||||
"""Dummy crypto module, pretends everything is fine."""
|
||||
|
||||
def verify_server_challenge(self, data):
|
||||
def verify_server_challenge(self, _):
|
||||
"""Verify the server challenge"""
|
||||
|
||||
return True
|
||||
|
||||
def verify_challenge(self, data):
|
||||
def verify_challenge(self, _):
|
||||
"""Verify the challenge data"""
|
||||
|
||||
return True
|
||||
|
||||
def verify_server_accept(self, data):
|
||||
def verify_server_accept(self, _):
|
||||
"""Verify server’s accept message"""
|
||||
return True
|
||||
|
||||
def generate_challenge(self):
|
||||
"""Generate authentication challenge"""
|
||||
|
||||
return b"CHALLENGE"
|
||||
|
||||
def generate_client_auth(self):
|
||||
"""Generate client authentication data"""
|
||||
|
||||
return b"AUTH"
|
||||
|
||||
def verify_client_auth(self, data):
|
||||
def verify_client_auth(self, _):
|
||||
"""Verify client authentication data"""
|
||||
|
||||
return True
|
||||
|
||||
def generate_accept(self):
|
||||
"""Generate an ACCEPT message"""
|
||||
|
||||
return b"ACCEPT"
|
||||
|
||||
def get_box_keys(self):
|
||||
"""Get box keys"""
|
||||
|
||||
return {
|
||||
"encrypt_key": b"x" * 32,
|
||||
"encrypt_nonce": b"x" * 32,
|
||||
@@ -60,10 +79,10 @@ class DummyCrypto(object):
|
||||
}
|
||||
|
||||
def clean(self):
|
||||
return
|
||||
"""Clean up internal data"""
|
||||
|
||||
|
||||
def _dummy_boxstream(stream, **kwargs):
|
||||
def _dummy_boxstream(stream, **_):
|
||||
"""Identity boxstream, no tansformation."""
|
||||
return stream
|
||||
|
||||
@@ -72,7 +91,7 @@ def _client_stream_mocker():
|
||||
reader = AsyncBuffer(b"xxx")
|
||||
writer = AsyncBuffer(b"xxx")
|
||||
|
||||
async def _create_mock_streams(host, port):
|
||||
async def _create_mock_streams(host, port): # pylint: disable=unused-argument
|
||||
return reader, writer
|
||||
|
||||
return reader, writer, _create_mock_streams
|
||||
@@ -82,7 +101,7 @@ def _server_stream_mocker():
|
||||
reader = AsyncBuffer(b"xxx")
|
||||
writer = AsyncBuffer(b"xxx")
|
||||
|
||||
async def _create_mock_server(cb, host, port):
|
||||
async def _create_mock_server(cb, host, port): # pylint: disable=unused-argument
|
||||
await cb(reader, writer)
|
||||
|
||||
return reader, writer, _create_mock_server
|
||||
@@ -90,13 +109,13 @@ def _server_stream_mocker():
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_client(mocker):
|
||||
reader, writer, _create_mock_streams = _client_stream_mocker()
|
||||
"""Test the client"""
|
||||
|
||||
reader, _, _create_mock_streams = _client_stream_mocker()
|
||||
mocker.patch("asyncio.open_connection", new=_create_mock_streams)
|
||||
mocker.patch("secret_handshake.boxstream.BoxStream", new=_dummy_boxstream)
|
||||
mocker.patch("secret_handshake.boxstream.UnboxStream", new=_dummy_boxstream)
|
||||
|
||||
from secret_handshake import SHSClient
|
||||
|
||||
client = SHSClient("shop.local", 1111, SigningKey.generate(), os.urandom(32))
|
||||
client.crypto = DummyCrypto()
|
||||
|
||||
@@ -108,15 +127,15 @@ async def test_client(mocker):
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_server(mocker):
|
||||
from secret_handshake import SHSServer
|
||||
"""Test the server"""
|
||||
|
||||
resolve = Event()
|
||||
|
||||
async def _on_connect(conn):
|
||||
async def _on_connect(_):
|
||||
server.disconnect()
|
||||
resolve.set()
|
||||
|
||||
reader, writer, _create_mock_server = _server_stream_mocker()
|
||||
_, _, _create_mock_server = _server_stream_mocker()
|
||||
mocker.patch("asyncio.start_server", new=_create_mock_server)
|
||||
mocker.patch("secret_handshake.boxstream.BoxStream", new=_dummy_boxstream)
|
||||
mocker.patch("secret_handshake.boxstream.UnboxStream", new=_dummy_boxstream)
|
||||
|
Reference in New Issue
Block a user