earthsnake/earthsnake/document/es4.py

361 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Format validator for raw (JSON) documents in the es.4 format"""
from datetime import datetime, timezone
from hashlib import sha256
from typing import Any, Dict, Mapping, Optional, Tuple, TypedDict
import fastjsonschema
from ..base32 import base32_bytes_to_string
from ..exc import ValidationError
from ..identity import Identity
from ..path import Path
from ..share import Share
from ..types import B32_CHAR
from . import Document
class RawDocument(TypedDict, total=False):
"""A raw Document object"""
author: str
signature: str
content: str
contentHash: str
deleteAfter: Optional[int]
path: str
workspace: str
timestamp: int
class Es4Document(Document): # pylint: disable=too-many-instance-attributes
"""Validator for the 'es.4' format
Checks if documents are spec-compliant before ingesting, and signs them according to spec.
See https://earthstar-project.org/specs/data-spec
"""
# Tolerance for accepting messages from the future (because of clock skew between peers)
FUTURE_CUTOFF_MICROSECONDS = 600000000 # 10 minutes
# Allowed valid range of timestamps (in microseconds, not milliseconds); these are taken from
# the reference implementation, but they are not specced.
MIN_TIMESTAMP = 10000000000000 # 1983-01-02T06:57:53Z
MAX_TIMESTAMP = 9007199254740990 # 2255-06-05T23:47:34.740990Z
# 4 million bytes = 4 megabytes (measured as bytes, not as utf-8 characters)
MAX_CONTENT_LENGTH = 4000000
CORE_SCHEMA = {
'title': 'Earthstar document format es.4',
'description': 'This schema describes a valid Earthstar document in the es.4 format',
'type': 'object',
'properties': {
'format': {
'description': 'The format identifier. Must be "es.4"',
'enum': ['es.4'],
},
'author': {
'type': 'string',
'pattern': Identity.PATTERN,
},
'content': {
'type': 'string',
'maxLength': MAX_CONTENT_LENGTH,
},
'contentHash': {
'type': 'string',
'pattern': f'^b[{B32_CHAR}]{{52}}$',
},
'deleteAfter': {
'oneOf': [
{
'type': 'integer',
'minimum': MIN_TIMESTAMP,
'maximum': MAX_TIMESTAMP,
},
{
'type': 'null',
},
],
},
'path': {
'type': 'string',
'pattern': Path.PATTERN,
'minLength': 2,
'maxLength': 512,
},
'signature': {
'type': 'string',
'pattern': f'^b[{B32_CHAR}]{{103}}$',
},
'timestamp': {
'type': 'integer',
'minimum': MIN_TIMESTAMP,
'maximum': MAX_TIMESTAMP,
},
'workspace': {
'type': 'string',
'pattern': Share.PATTERN,
},
},
'required': [
'format',
'timestamp',
'deleteAfter',
'author',
'path',
'workspace',
'signature',
'contentHash',
'content',
],
'additionalProperties': False,
}
__document_format__ = 'es.4'
_validator = staticmethod(fastjsonschema.compile(CORE_SCHEMA))
def __init__( # pylint: disable=too-many-arguments
self,
author: Identity,
share: Share,
path: Path,
timestamp: Optional[datetime] = None,
content: Optional[str] = None,
content_hash: Optional[str] = None,
signature: Optional[str] = None,
delete_after: Optional[datetime] = None,
):
self.author: Identity = author
self.path = path
self.signature = signature
self._content = content or ''
self._content_hash = content_hash
self.delete_after = delete_after
self.share = share
self.timestamp = timestamp or datetime.utcnow()
@classmethod
def from_json(cls, raw_document: Dict[str, Any]) -> 'Es4Document':
"""Validate raw_document as an es.4 document and create an ``Es4Document`` from it
:returns: a new ``Es4Document``
:raises ValidationError: if anything is wrong
"""
# do this first to ensure we have all the right datatypes in the right fields
cls._check_basic_document_validity(raw_document)
# this is the most likely to fail under regular conditions, so do it next (because of
# clock skew and expired ephemeral documents)
cls._check_timestamp_is_ok(
raw_document['timestamp'],
raw_document['deleteAfter'],
)
timestamp = datetime.utcfromtimestamp(raw_document['timestamp'] / 1000000).replace(
tzinfo=timezone.utc
)
if raw_document['deleteAfter']:
delete_after = datetime.utcfromtimestamp(raw_document['deleteAfter'] / 1000000).replace(
tzinfo=timezone.utc
)
else:
delete_after = None
author = Identity.from_address(raw_document['author'])
share = Share.from_address(raw_document['workspace'])
path = Path(raw_document['path'])
if not path.can_write(author):
raise ValidationError(f'Author {author} cannot write to path {path}')
# do this last since it might be slow on a large document
cls._check_content_matches_hash(raw_document['content'], raw_document['contentHash'])
document = cls(
author,
share,
path,
timestamp=timestamp,
content=raw_document['content'],
content_hash=raw_document['contentHash'],
signature=raw_document['signature'],
delete_after=delete_after,
)
document.validate_signature()
return document
@property
def content_hash(self) -> str:
"""The hash of the documents content"""
if not self._content_hash:
self._content_hash = self.hash_value(self.content)
return self._content_hash
@staticmethod
def hash_value(value: str) -> str:
"""Calculate the SHA-256 digest of the value and return its Base32 representation"""
hasher = sha256()
hasher.update(value.encode('utf-8'))
return base32_bytes_to_string(hasher.digest())
@property
def content(self) -> str:
"""The content of the document"""
return self._content
@content.setter
def content(self, value: str) -> None:
self._content = value
self._content_hash = self.hash_value(value)
@property
def content_length(self) -> int:
return len(self._content)
def generate_hash(self) -> str:
"""Calculate the hash of the document
This hash will be signed with the authors key to propagate the signature property."""
hash_keys: Dict[str, Any] = {
'author': self.author,
'contentHash': self.content_hash,
'deleteAfter': (
int(self.delete_after.timestamp() * 1000000) if self.delete_after else None
),
'format': self.__document_format__,
'path': self.path,
'timestamp': int(self.timestamp.timestamp() * 1000000),
'workspace': self.share,
}
hasher = sha256()
for key, value in sorted(hash_keys.items(), key=lambda elem: elem[0]):
if value is None:
continue
hasher.update(f'{key}\t{value}\n'.encode('utf-8'))
return base32_bytes_to_string(hasher.digest())
def sign(self, identity: Optional[Identity] = None) -> None:
if identity and identity != self.author:
raise ValidationError(
"when signing a document, keypair address must match document author"
)
if identity:
self.author = identity
self.signature = self.author.sign(self.generate_hash())
@classmethod
def remove_extra_fields(cls, doc: Dict[str, Any]) -> Tuple[RawDocument, Mapping[str, Any]]:
"""Return a copy of the doc without extra fields, plus the extra fields as a separate object
This should be run before check_document_is_valid. The output doc will be more likely to
be valid once the extra fields have been removed.
:raises ValidationError: if the inpuc is not a plain JSON object
"""
if not isinstance(doc, dict):
raise ValidationError('Document is not a plain JSON object')
valid_keys = set(cls.CORE_SCHEMA['properties'].keys()) # type: ignore
doc2: RawDocument = {}
extras: Mapping[str, Any] = {}
for (key, val) in doc.items():
if key in valid_keys:
doc2[key] = val # type: ignore
else:
if not key.startswith('_'):
raise ValidationError(
"extra document fields must have names starting with an underscore"
)
extras[key] = val # type: ignore
return (doc2, extras)
# These are broken out for easier unit testing. They will not normally be used directly; use
# the main assertDocumentIsValid instead.
@classmethod
def _check_basic_document_validity(cls, doc: Dict[str, Any]) -> None:
"""Check for correct fields and datatypes"""
try:
cls._validator(doc)
except BaseException as exc:
raise ValidationError(exc) from None
# TODO: is there more to check?
@classmethod
def _check_timestamp_is_ok(cls, timestamp: int, delete_after: Optional[int]) -> None:
"""Check for valid timestamp, and expired ephemeral documents
timestamp and deleteAfter are already verified as good numbers by the schema checker:
- in the right range of min and max allowed timestamps
- integers, and not NaN or infinity
Timestamp must not be too far in the future (but allow some clock skew).
"""
now = int(datetime.utcnow().timestamp() * 1000000)
if timestamp > now + cls.FUTURE_CUTOFF_MICROSECONDS:
raise ValidationError("timestamp too far in the future")
# Ephemeral documents
if delete_after is not None:
# Only valid if expiration date is in the future
if now > delete_after:
raise ValidationError("ephemeral doc has expired")
# Can't expire before it was created, that makes no sense
if delete_after <= timestamp:
raise ValidationError("ephemeral doc expired before it was created")
def validate_signature(self) -> None:
"""Check if the signature is good"""
try:
doc_hash = self.generate_hash()
except BaseException as exc:
raise ValidationError(f'Cannot check signature: {exc}') from None
if not self.signature:
raise ValidationError('document has no signature assigned')
if not self.author.verify(doc_hash, self.signature):
raise ValidationError('signature is invalid')
@staticmethod
def _check_content_matches_hash(content: str, content_hash: str) -> None:
"""Ensure the contentHash matches the actual content"""
hasher = sha256()
hasher.update(content.encode('utf-8'))
calculated_hash = base32_bytes_to_string(hasher.digest())
if content is not None and calculated_hash != content_hash:
raise ValidationError("content does not match contentHash")