The SSH host key has changed on 8 April, 2022 to this one: SHA256:573uTBSeh74kvOo0HJXi5ijdzRm8me27suzNEDlGyrQ
Python implementation of [Earthstar](https://earthstar-project.org/)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
earthsnake/earthsnake/syncer/envelope.py

115 lines
2.3 KiB

"""RPC Envelope handling"""
from enum import Enum, auto
from typing import Any, Callable, Dict, List, Literal, Union
Fn = Callable[..., Any]
FnsBag = Dict[str, Fn]
# pylint: disable=too-few-public-methods
class EnvelopeKind(Enum):
"""Types of envelopes"""
NOTIFY = auto()
REQUEST = auto()
RESPONSE = auto()
class EnvelopeBase:
"""Base envelope type"""
kind: EnvelopeKind
from_device_id: str
envelope_id: str
def as_dict(self) -> Dict[str, Any]:
"""Convert the envelope to a dictionary"""
return {
'kind': self.kind.name,
'fromDeviceId': self.from_device_id,
'envelopeId': self.envelope_id,
}
class EnvelopeNotify(EnvelopeBase):
"""Envelope type for a notification"""
kind: Literal[EnvelopeKind.NOTIFY]
method: str
args: List[str]
def as_dict(self) -> Dict[str, Any]:
envelope = super().as_dict()
envelope.update(
{
'method': self.method,
'args': self.args,
}
)
return envelope
class EnvelopeRequest(EnvelopeBase):
"""Envelope type for a request"""
kind: Literal[EnvelopeKind.REQUEST]
method: str
args: List[str]
def as_dict(self) -> Dict[str, Any]:
envelope = super().as_dict()
envelope.update(
{
'method': self.method,
'args': self.args,
}
)
return envelope
class EnvelopeResponseWithData(EnvelopeBase):
"""Envelope type for a data response"""
kind: Literal[EnvelopeKind.RESPONSE]
data: List[str]
def as_dict(self) -> Dict[str, Any]:
envelope = super().as_dict()
envelope.update(
{
'data': self.data,
}
)
return envelope
class EnvelopeResponseWithError(EnvelopeBase):
"""Envelope type for an error response"""
kind: Literal[EnvelopeKind.RESPONSE]
error: str
def as_dict(self) -> Dict[str, Any]:
envelope = super().as_dict()
envelope.update(
{
'error': self.error,
}
)
EvelopeResponse = Union[EnvelopeResponseWithData, EnvelopeResponseWithError]
Envelope = Union[
EnvelopeNotify,
EnvelopeRequest,
EnvelopeResponseWithData,
EnvelopeResponseWithError,
]