PySecretHandshake/tests/test_network.py

200 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# SPDX-License-Identifier: MIT
#
# Copyright (c) 2017 PySecretHandshake contributors (see AUTHORS for more details)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Tests for the networking components"""
from asyncio import Event, wait_for
import os
from typing import Any, Awaitable, Callable, Tuple
from nacl.signing import SigningKey
import pytest
from pytest_mock import MockerFixture
from secret_handshake import SHSClient, SHSServer
from secret_handshake.boxstream import BoxStreamKeys
from secret_handshake.network import SHSDuplexStream
from .helpers import AsyncBuffer
class DummyCrypto:
"""Dummy crypto module, pretends everything is fine."""
def verify_server_challenge(self, _: bytes) -> bool:
"""Verify the server challenge"""
return True
def verify_challenge(self, _: bytes) -> bool:
"""Verify the challenge data"""
return True
def verify_server_accept(self, _: bytes) -> bool:
"""Verify servers accept message"""
return True
def generate_challenge(self) -> bytes:
"""Generate authentication challenge"""
return b"CHALLENGE"
def generate_client_auth(self) -> bytes:
"""Generate client authentication data"""
return b"AUTH"
def verify_client_auth(self, _: bytes) -> bool:
"""Verify client authentication data"""
return True
def generate_accept(self) -> bytes:
"""Generate an ACCEPT message"""
return b"ACCEPT"
def get_box_keys(self) -> BoxStreamKeys:
"""Get box keys"""
return {
"encrypt_key": b"x" * 32,
"encrypt_nonce": b"x" * 32,
"decrypt_key": b"x" * 32,
"decrypt_nonce": b"x" * 32,
"shared_secret": b"x" * 32,
}
def clean(self) -> None:
"""Clean up internal data"""
def _dummy_boxstream(stream: AsyncBuffer, **_: Any) -> AsyncBuffer:
"""Identity boxstream, no transformation."""
return stream
def _client_stream_mocker() -> (
Tuple[AsyncBuffer, AsyncBuffer, Callable[[str, int], Awaitable[Tuple[AsyncBuffer, AsyncBuffer]]]]
):
reader = AsyncBuffer(b"xxx")
writer = AsyncBuffer(b"xxx")
async def _create_mock_streams(
host: str, port: int # pylint: disable=unused-argument
) -> Tuple[AsyncBuffer, AsyncBuffer]:
return reader, writer
return reader, writer, _create_mock_streams
def _server_stream_mocker() -> (
Tuple[
AsyncBuffer,
AsyncBuffer,
Callable[[Callable[[AsyncBuffer, AsyncBuffer], Awaitable[None]], str, int], Awaitable[None]],
]
):
reader = AsyncBuffer(b"xxx")
writer = AsyncBuffer(b"xxx")
async def _create_mock_server(
cb: Callable[[AsyncBuffer, AsyncBuffer], Awaitable[None]],
host: str, # pylint: disable=unused-argument
port: int, # pylint: disable=unused-argument
) -> None:
await cb(reader, writer)
return reader, writer, _create_mock_server
@pytest.mark.asyncio
async def test_client(mocker: MockerFixture) -> None:
"""Test the client"""
reader, _, _create_mock_streams = _client_stream_mocker()
mocker.patch("secret_handshake.network.open_connection", new=_create_mock_streams)
mocker.patch("secret_handshake.boxstream.BoxStream", new=_dummy_boxstream)
mocker.patch("secret_handshake.boxstream.UnboxStream", new=_dummy_boxstream)
client = SHSClient("shop.local", 1111, SigningKey.generate(), os.urandom(32))
client.crypto = DummyCrypto() # type: ignore[assignment]
await client.open()
reader.append(b"TEST")
assert (await client.read()) == b"TEST"
client.disconnect()
@pytest.mark.asyncio
async def test_server(mocker: MockerFixture) -> None:
"""Test the server"""
resolve = Event()
async def _on_connect(_: Any) -> None:
server.disconnect()
resolve.set()
_, _, _create_mock_server = _server_stream_mocker()
mocker.patch("secret_handshake.network.start_server", new=_create_mock_server)
mocker.patch("secret_handshake.boxstream.BoxStream", new=_dummy_boxstream)
mocker.patch("secret_handshake.boxstream.UnboxStream", new=_dummy_boxstream)
server = SHSServer("shop.local", 1111, SigningKey.generate(), os.urandom(32))
server.crypto = DummyCrypto() # type: ignore[assignment]
server.on_connect(_on_connect)
await server.listen()
await wait_for(resolve.wait(), 5)
def test_duplex_write(mocker: MockerFixture) -> None:
"""Test the writing capabilities of the duplex stream"""
d_stream = SHSDuplexStream()
d_stream.write_stream = mocker.AsyncMock()
d_stream.write(b"thing")
d_stream.write_stream.write.assert_called_once_with(b"thing")
def test_duplex_close_no_write_stream() -> None:
"""Test if SHSDuplexStreams close method doesnt fail if there is no write stream"""
d_stream = SHSDuplexStream()
assert d_stream.write_stream is None
d_stream.close()
# We cannot really do assertions here. If there is not set (it is None), the above call would fail
def test_duplex_stream_aiter() -> None:
"""Test if the __aiter__ method of SHSDuplexStream returns the stream itself"""
d_stream = SHSDuplexStream()
assert d_stream.__aiter__() is d_stream # pylint: disable=unnecessary-dunder-call