116 lines
2.3 KiB
Python
116 lines
2.3 KiB
Python
"""RPC Envelope handling"""
|
|
|
|
from enum import Enum, auto
|
|
from typing import Any, Callable, Dict, List, Literal, Union
|
|
|
|
Fn = Callable[..., Any]
|
|
FnsBag = Dict[str, Fn]
|
|
|
|
# pylint: disable=too-few-public-methods
|
|
|
|
|
|
class EnvelopeKind(Enum):
|
|
"""Types of envelopes"""
|
|
|
|
NOTIFY = auto()
|
|
REQUEST = auto()
|
|
RESPONSE = auto()
|
|
|
|
|
|
class EnvelopeBase:
|
|
"""Base envelope type"""
|
|
|
|
kind: EnvelopeKind
|
|
from_device_id: str
|
|
envelope_id: str
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
"""Convert the envelope to a dictionary"""
|
|
|
|
return {
|
|
'kind': self.kind.name,
|
|
'fromDeviceId': self.from_device_id,
|
|
'envelopeId': self.envelope_id,
|
|
}
|
|
|
|
|
|
class EnvelopeNotify(EnvelopeBase):
|
|
"""Envelope type for a notification"""
|
|
|
|
kind: Literal[EnvelopeKind.NOTIFY]
|
|
method: str
|
|
args: List[str]
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
envelope = super().as_dict()
|
|
envelope.update(
|
|
{
|
|
'method': self.method,
|
|
'args': self.args,
|
|
}
|
|
)
|
|
|
|
return envelope
|
|
|
|
|
|
class EnvelopeRequest(EnvelopeBase):
|
|
"""Envelope type for a request"""
|
|
|
|
kind: Literal[EnvelopeKind.REQUEST]
|
|
method: str
|
|
args: List[str]
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
envelope = super().as_dict()
|
|
envelope.update(
|
|
{
|
|
'method': self.method,
|
|
'args': self.args,
|
|
}
|
|
)
|
|
|
|
return envelope
|
|
|
|
|
|
class EnvelopeResponseWithData(EnvelopeBase):
|
|
"""Envelope type for a data response"""
|
|
|
|
kind: Literal[EnvelopeKind.RESPONSE]
|
|
data: List[str]
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
envelope = super().as_dict()
|
|
envelope.update(
|
|
{
|
|
'data': self.data,
|
|
}
|
|
)
|
|
|
|
return envelope
|
|
|
|
|
|
class EnvelopeResponseWithError(EnvelopeBase):
|
|
"""Envelope type for an error response"""
|
|
|
|
kind: Literal[EnvelopeKind.RESPONSE]
|
|
error: str
|
|
|
|
def as_dict(self) -> Dict[str, Any]:
|
|
envelope = super().as_dict()
|
|
envelope.update(
|
|
{
|
|
'error': self.error,
|
|
}
|
|
)
|
|
|
|
|
|
EvelopeResponse = Union[EnvelopeResponseWithData, EnvelopeResponseWithError]
|
|
|
|
|
|
Envelope = Union[
|
|
EnvelopeNotify,
|
|
EnvelopeRequest,
|
|
EnvelopeResponseWithData,
|
|
EnvelopeResponseWithError,
|
|
]
|