ci: Add and configure PyLint, and make it happy

This commit is contained in:
2023-11-01 06:03:06 +01:00
parent e0cd456e77
commit d51f27d883
13 changed files with 379 additions and 81 deletions

View File

@@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Feed related functionality"""
from .models import Feed, LocalFeed, LocalMessage, Message, NoPrivateKeyException
__all__ = ("Feed", "LocalFeed", "Message", "LocalMessage", "NoPrivateKeyException")

View File

@@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Feed models"""
from base64 import b64encode
from collections import OrderedDict, namedtuple
import datetime
@@ -33,44 +35,65 @@ OrderedMsg = namedtuple("OrderedMsg", ("previous", "author", "sequence", "timest
class NoPrivateKeyException(Exception):
pass
"""Exception to raise when a private key is not available"""
def to_ordered(data):
"""Convert a dictionary to an ``OrderedDict``"""
smsg = OrderedMsg(**data)
return OrderedDict((k, getattr(smsg, k)) for k in smsg._fields)
def get_millis_1970():
"""Get the UNIX timestamp in milliseconds"""
return int(datetime.datetime.utcnow().timestamp() * 1000)
class Feed(object):
class Feed:
"""Base class for feeds"""
def __init__(self, public_key):
self.public_key = public_key
@property
def id(self):
"""The identifier of the feed"""
return tag(self.public_key).decode("ascii")
def sign(self, msg):
"""Sign a message"""
raise NoPrivateKeyException("Cannot use remote identity to sign (no private key!)")
class LocalFeed(Feed):
def __init__(self, private_key):
"""Class representing a local feed"""
def __init__(self, private_key): # pylint: disable=super-init-not-called
self.private_key = private_key
@property
def public_key(self):
"""The public key of the feed"""
return self.private_key.verify_key
def sign(self, msg):
"""Sign a message for this feed"""
return self.private_key.sign(msg).signature
class Message(object):
def __init__(self, feed, content, signature, sequence=1, timestamp=None, previous=None):
class Message:
"""Base class for SSB messages"""
def __init__( # pylint: disable=too-many-arguments
self, feed, content, signature=None, sequence=1, timestamp=None, previous=None
):
self.feed = feed
self.content = content
@@ -88,14 +111,21 @@ class Message(object):
@classmethod
def parse(cls, data, feed):
"""Parse raw message data"""
obj = loads(data, object_pairs_hook=OrderedDict)
msg = cls(feed, obj["content"], timestamp=obj["timestamp"])
return msg
def serialize(self, add_signature=True):
"""Serialize the message"""
return dumps(self.to_dict(add_signature=add_signature), indent=2).encode("utf-8")
def to_dict(self, add_signature=True):
"""Convert the message to a dictionary"""
obj = to_ordered(
{
"previous": self.previous.key if self.previous else None,
@@ -109,23 +139,34 @@ class Message(object):
if add_signature:
obj["signature"] = self.signature
return obj
def verify(self, signature):
"""Verify the signature of the message"""
return self.signature == signature
@property
def hash(self):
hash = sha256(self.serialize()).digest()
return b64encode(hash).decode("ascii") + ".sha256"
"""The cryptographic hash of the message"""
hash_ = sha256(self.serialize()).digest()
return b64encode(hash_).decode("ascii") + ".sha256"
@property
def key(self):
"""The key of the message"""
return "%" + self.hash
class LocalMessage(Message):
def __init__(self, feed, content, signature=None, sequence=1, timestamp=None, previous=None):
"""Class representing a local message"""
def __init__( # pylint: disable=too-many-arguments,super-init-not-called
self, feed, content, signature=None, sequence=1, timestamp=None, previous=None
):
self.feed = feed
self.content = content

View File

@@ -20,23 +20,32 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""MuxRPC"""
from async_generator import async_generator, yield_
from ssb.packet_stream import PSMessageType
class MuxRPCAPIException(Exception):
pass
"""Exception to raise on MuxRPC API errors"""
class MuxRPCHandler(object):
class MuxRPCHandler: # pylint: disable=too-few-public-methods
"""Base MuxRPC handler class"""
def check_message(self, msg):
"""Check message validity"""
body = msg.body
if isinstance(body, dict) and "name" in body and body["name"] == "Error":
raise MuxRPCAPIException(body["message"])
class MuxRPCRequestHandler(MuxRPCHandler):
"""MuxRPC handler for incoming RPC requests"""
def __init__(self, ps_handler):
self.ps_handler = ps_handler
@@ -47,52 +56,72 @@ class MuxRPCRequestHandler(MuxRPCHandler):
class MuxRPCSourceHandler(MuxRPCHandler):
"""MuxRPC handler for source-type RPC requests"""
def __init__(self, ps_handler):
self.ps_handler = ps_handler
@async_generator
async def __aiter__(self):
async for msg in self.ps_handler:
try:
self.check_message(msg)
await yield_(msg)
except MuxRPCAPIException:
raise
self.check_message(msg)
await yield_(msg)
class MuxRPCSinkHandlerMixin(object):
class MuxRPCSinkHandlerMixin: # pylint: disable=too-few-public-methods
"""Mixin for sink-type MuxRPC handlers"""
def send(self, msg, msg_type=PSMessageType.JSON, end=False):
"""Send a message through the stream"""
self.connection.send(msg, stream=True, msg_type=msg_type, req=self.req, end_err=end)
class MuxRPCDuplexHandler(MuxRPCSinkHandlerMixin, MuxRPCSourceHandler):
"""MuxRPC handler for duplex streams"""
def __init__(self, ps_handler, connection, req):
super(MuxRPCDuplexHandler, self).__init__(ps_handler)
super().__init__(ps_handler)
self.connection = connection
self.req = req
class MuxRPCSinkHandler(MuxRPCHandler, MuxRPCSinkHandlerMixin):
"""MuxRPC handler for sinks"""
def __init__(self, connection, req):
self.connection = connection
self.req = req
def _get_appropriate_api_handler(type_, connection, ps_handler, req):
"""Find the appropriate MuxRPC handler"""
if type_ in {"sync", "async"}:
return MuxRPCRequestHandler(ps_handler)
elif type_ == "source":
if type_ == "source":
return MuxRPCSourceHandler(ps_handler)
elif type_ == "sink":
if type_ == "sink":
return MuxRPCSinkHandler(connection, req)
elif type_ == "duplex":
if type_ == "duplex":
return MuxRPCDuplexHandler(ps_handler, connection, req)
return None
class MuxRPCRequest:
"""MuxRPC request"""
class MuxRPCRequest(object):
@classmethod
def from_message(cls, message):
"""Initialise a request from a raw packet stream message"""
body = message.body
return cls(".".join(body["name"]), body["args"])
def __init__(self, name, args):
@@ -100,22 +129,28 @@ class MuxRPCRequest(object):
self.args = args
def __repr__(self):
return "<MuxRPCRequest {0.name} {0.args}>".format(self)
return f"<MuxRPCRequest {self.name} {self.args}>"
class MuxRPCMessage(object):
class MuxRPCMessage:
"""MuxRPC message"""
@classmethod
def from_message(cls, message):
"""Initialise a MuxRPC message from a raw packet stream message"""
return cls(message.body)
def __init__(self, body):
self.body = body
def __repr__(self):
return "<MuxRPCMessage {0.body}}>".format(self)
return f"<MuxRPCMessage {self.body}>"
class MuxRPCAPI(object):
class MuxRPCAPI:
"""Generic MuxRPC API"""
def __init__(self):
self.handlers = {}
self.connection = None
@@ -129,9 +164,13 @@ class MuxRPCAPI(object):
self.process(self.connection, MuxRPCRequest.from_message(req_message))
def add_connection(self, connection):
"""Set the packet stream connection of this RPC API"""
self.connection = connection
def define(self, name):
"""Decorator to define an RPC method handler"""
def _handle(f):
self.handlers[name] = f
@@ -140,17 +179,25 @@ class MuxRPCAPI(object):
return _handle
def process(self, connection, request):
"""Process an incoming request"""
handler = self.handlers.get(request.name)
if not handler:
raise MuxRPCAPIException("Method {} not found!".format(request.name))
raise MuxRPCAPIException(f"Method {request.name} not found!")
handler(connection, request)
def call(self, name, args, type_="sync"):
"""Call an RPC method"""
if not self.connection.is_connected:
raise Exception("not connected")
raise Exception("not connected") # pylint: disable=broad-exception-raised
old_counter = self.connection.req_counter
ps_handler = self.connection.send(
{"name": name.split("."), "args": args, "type": type_},
stream=type_ in {"sink", "source", "duplex"},
)
return _get_appropriate_api_handler(type_, self.connection, ps_handler, old_counter)

View File

@@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Packet streams"""
from asyncio import Event, Queue
from enum import Enum
import logging
@@ -28,28 +30,35 @@ import struct
from time import time
from async_generator import async_generator, yield_
from secret_handshake import SHSClient, SHSServer
import simplejson
logger = logging.getLogger("packet_stream")
class PSMessageType(Enum):
"""Available message types"""
BUFFER = 0
TEXT = 1
JSON = 2
class PSStreamHandler(object):
class PSStreamHandler:
"""Packet stream handler"""
def __init__(self, req):
super(PSStreamHandler).__init__()
self.req = req
self.queue = Queue()
async def process(self, msg):
"""Process a pending message"""
await self.queue.put(msg)
async def stop(self):
"""Stop a pending request"""
await self.queue.put(None)
@async_generator
@@ -61,30 +70,40 @@ class PSStreamHandler(object):
await yield_(elem)
class PSRequestHandler(object):
class PSRequestHandler:
"""Packet stream request handler"""
def __init__(self, req):
super(PSRequestHandler).__init__()
self.req = req
self.event = Event()
self._msg = None
async def process(self, msg):
"""Process a message request"""
self._msg = msg
self.event.set()
async def stop(self):
"""Stop a pending event request"""
if not self.event.is_set():
self.event.set()
def __await__(self):
# wait until 'process' is called
yield from self.event.wait().__await__()
yield from self.event.wait().__await__() # pylint: disable=no-member
return self._msg
class PSMessage(object):
class PSMessage:
"""Packet Stream message"""
@classmethod
def from_header_body(cls, flags, req, body):
"""Parse a raw message"""
type_ = PSMessageType(flags & 0x03)
if type_ == PSMessageType.TEXT:
@@ -96,13 +115,17 @@ class PSMessage(object):
@property
def data(self):
"""The raw message data"""
if self.type == PSMessageType.TEXT:
return self.body.encode("utf-8")
elif self.type == PSMessageType.JSON:
if self.type == PSMessageType.JSON:
return simplejson.dumps(self.body).encode("utf-8")
return self.body
def __init__(self, type_, body, stream, end_err, req=None):
def __init__(self, type_, body, stream, end_err, req=None): # pylint: disable=too-many-arguments
self.stream = stream
self.end_err = end_err
self.type = type_
@@ -111,37 +134,45 @@ class PSMessage(object):
def __repr__(self):
if self.type == PSMessageType.BUFFER:
body = "{} bytes".format(len(self.body))
body = f"{len(self.body)} bytes"
else:
body = self.body
return "<PSMessage ({}): {}{} {}{}>".format(
self.type.name,
body,
"" if self.req is None else " [{}]".format(self.req),
"~" if self.stream else "",
"!" if self.end_err else "",
)
req = "" if self.req is None else f" [{self.req}]"
is_stream = "~" if self.stream else ""
err = "!" if self.end_err else ""
return f"<PSMessage ({self.type.name}): {body}{req} {is_stream}{err}>"
class PacketStream(object):
class PacketStream:
"""SSB Packet stream"""
def __init__(self, connection):
self.connection = connection
self.req_counter = 1
self._event_map = {}
self._connected = False
def register_handler(self, handler):
"""Register an RPC handler"""
self._event_map[handler.req] = (time(), handler)
@property
def is_connected(self):
"""Check if the stream is connected"""
return self.connection.is_connected
@async_generator
async def __aiter__(self):
while True:
msg = await self.read()
if not msg:
return
# filter out replies
if msg.req >= 0:
await yield_(msg)
@@ -149,20 +180,24 @@ class PacketStream(object):
async def __await__(self):
async for data in self:
logger.info("RECV: %r", data)
if data is None:
return
async def _read(self):
try:
header = await self.connection.read()
if not header or header == b"\x00" * 9:
return
flags, length, req = struct.unpack(">BIi", header)
n_packets = ceil(length / 4096)
body = b""
for n in range(n_packets):
for _ in range(n_packets):
body += await self.connection.read()
logger.debug("READ %s %s", header, len(body))
@@ -173,12 +208,14 @@ class PacketStream(object):
return None
async def read(self):
"""Read data from the packet stream"""
msg = await self._read()
if not msg:
return None
# check whether it's a reply and handle accordingly
if msg.req < 0:
t, handler = self._event_map[-msg.req]
_, handler = self._event_map[-msg.req]
await handler.process(msg)
logger.info("RESPONSE [%d]: %r", -msg.req, msg)
if msg.end_err:
@@ -200,7 +237,11 @@ class PacketStream(object):
logger.debug("WRITE HDR: %s", header)
logger.debug("WRITE DATA: %s", msg.data)
def send(self, data, msg_type=PSMessageType.JSON, stream=False, end_err=False, req=None):
def send( # pylint: disable=too-many-arguments
self, data, msg_type=PSMessageType.JSON, stream=False, end_err=False, req=None
):
"""Send data through the packet stream"""
update_counter = False
if req is None:
update_counter = True
@@ -222,5 +263,7 @@ class PacketStream(object):
return handler
def disconnect(self):
"""Disconnect the stream"""
self._connected = False
self.connection.disconnect()

View File

@@ -20,6 +20,8 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Utility functions"""
from base64 import b64decode, b64encode
import os
@@ -28,17 +30,19 @@ import yaml
class ConfigException(Exception):
pass
"""Exception to raise if there is a problem with the configuration data"""
def tag(key):
"""Create tag from publick key."""
"""Create tag from public key"""
return b"@" + b64encode(bytes(key)) + b".ed25519"
def load_ssb_secret():
"""Load SSB keys from ~/.ssb"""
with open(os.path.expanduser("~/.ssb/secret")) as f:
with open(os.path.expanduser("~/.ssb/secret"), encoding="utf-8") as f:
config = yaml.load(f, Loader=yaml.SafeLoader)
if config["curve"] != "ed25519":