The SSH host key has changed on 8 April, 2022 to this one: SHA256:573uTBSeh74kvOo0HJXi5ijdzRm8me27suzNEDlGyrQ

Create the es.4 document format

Gergely Polonkai 3 months ago
parent 22d33a5d41
commit 280652d5a7
No known key found for this signature in database
GPG Key ID: 2D2885533B869ED4
  1. 2
  2. 356
  3. 2
  4. 274

@ -17,7 +17,7 @@ class Document(ABC): # pragma: no cover pylint: disable=too-few-public-methods
timestamp: datetime
signature: str
signature: Optional[str]
def sign(self, identity: Optional[Identity] = None) -> None:

@ -0,0 +1,356 @@
"""Format validator for raw (JSON) documents in the es.4 format"""
from datetime import datetime, timezone
from hashlib import sha256
from typing import Any, Dict, Mapping, Optional, Tuple, TypedDict
import fastjsonschema
from ..base32 import base32_bytes_to_string
from ..exc import ValidationError
from ..identity import Identity
from ..path import Path
from ..share import Share
from ..types import B32_CHAR
from . import Document
class RawDocument(TypedDict, total=False):
"""A raw Document object"""
author: str
signature: str
content: str
contentHash: str
deleteAfter: Optional[int]
path: str
workspace: str
timestamp: int
class Es4Document(Document): # pylint: disable=too-many-instance-attributes
"""Validator for the 'es.4' format
Checks if documents are spec-compliant before ingesting, and signs them according to spec.
# Tolerance for accepting messages from the future (because of clock skew between peers)
FUTURE_CUTOFF_MICROSECONDS = 600000000 # 10 minutes
# Allowed valid range of timestamps (in microseconds, not milliseconds); these are taken from
# the reference implementation, but they are not specced.
MIN_TIMESTAMP = 10000000000000 # 1983-01-02T06:57:53Z
MAX_TIMESTAMP = 9007199254740990 # 2255-06-05T23:47:34.740990Z
# 4 million bytes = 4 megabytes (measured as bytes, not as utf-8 characters)
'title': 'Earthstar document format es.4',
'description': 'This schema describes a valid Earthstar document in the es.4 format',
'type': 'object',
'properties': {
'format': {
'description': 'The format identifier. Must be "es.4"',
'enum': ['es.4'],
'author': {
'type': 'string',
'pattern': Identity.PATTERN,
'content': {
'type': 'string',
'contentHash': {
'type': 'string',
'pattern': f'^b[{B32_CHAR}]{{52}}$',
'deleteAfter': {
'oneOf': [
'type': 'integer',
'minimum': MIN_TIMESTAMP,
'maximum': MAX_TIMESTAMP,
'type': 'null',
'path': {
'type': 'string',
'pattern': Path.PATTERN,
'minLength': 2,
'maxLength': 512,
'signature': {
'type': 'string',
'pattern': f'^b[{B32_CHAR}]{{103}}$',
'timestamp': {
'type': 'integer',
'minimum': MIN_TIMESTAMP,
'maximum': MAX_TIMESTAMP,
'workspace': {
'type': 'string',
'pattern': Share.PATTERN,
'required': [
'additionalProperties': False,
__document_format__ = 'es.4'
_validator = staticmethod(fastjsonschema.compile(CORE_SCHEMA))
def __init__( # pylint: disable=too-many-arguments
author: Identity,
share: Share,
path: Path,
timestamp: Optional[datetime] = None,
content: Optional[str] = None,
content_hash: Optional[str] = None,
signature: Optional[str] = None,
delete_after: Optional[datetime] = None,
): Identity = author
self.path = path
self.signature = signature
self._content = content or ''
self._content_hash = content_hash
self.delete_after = delete_after
self.share = share
self.timestamp = timestamp or datetime.utcnow()
def from_json(cls, raw_document: Dict[str, Any]) -> 'Es4Document':
"""Validate raw_document as an es.4 document and create an ``Es4Document`` from it
:returns: a new ``Es4Document``
:raises ValidationError: if anything is wrong
# do this first to ensure we have all the right datatypes in the right fields
# this is the most likely to fail under regular conditions, so do it next (because of
# clock skew and expired ephemeral documents)
timestamp = datetime.utcfromtimestamp(raw_document['timestamp'] / 1000000).replace(
if raw_document['deleteAfter']:
delete_after = datetime.utcfromtimestamp(raw_document['deleteAfter'] / 1000000).replace(
delete_after = None
author = Identity.from_address(raw_document['author'])
share = Share.from_address(raw_document['workspace'])
path = Path(raw_document['path'])
if not path.can_write(author):
raise ValidationError(f'Author {author} cannot write to path {path}')
# do this last since it might be slow on a large document
cls._check_content_matches_hash(raw_document['content'], raw_document['contentHash'])
document = cls(
return document
def content_hash(self) -> str:
"""The hash of the document’s content"""
if not self._content_hash:
self._content_hash = self.hash_value(self.content)
return self._content_hash
def hash_value(value: str) -> str:
"""Calculate the SHA-256 digest of the value and return its Base32 representation"""
hasher = sha256()
return base32_bytes_to_string(hasher.digest())
def content(self) -> str:
"""The content of the document"""
return self._content
def content(self, value: str) -> None:
self._content = value
self._content_hash = self.hash_value(value)
def generate_hash(self) -> str:
"""Calculate the hash of the document
This hash will be signed with the authors key to propagate the signature property."""
hash_keys: Dict[str, Any] = {
'contentHash': self.content_hash,
'deleteAfter': (
int(self.delete_after.timestamp() * 1000000) if self.delete_after else None
'format': self.__document_format__,
'path': self.path,
'timestamp': int(self.timestamp.timestamp() * 1000000),
'workspace': self.share,
hasher = sha256()
for key, value in sorted(hash_keys.items(), key=lambda elem: elem[0]):
if value is None:
return base32_bytes_to_string(hasher.digest())
def sign(self, identity: Optional[Identity] = None) -> None:
if identity and identity !=
raise ValidationError(
"when signing a document, keypair address must match document author"
if identity: = identity
self.signature =
def remove_extra_fields(cls, doc: Dict[str, Any]) -> Tuple[RawDocument, Mapping[str, Any]]:
"""Return a copy of the doc without extra fields, plus the extra fields as a separate object
This should be run before check_document_is_valid. The output doc will be more likely to
be valid once the extra fields have been removed.
:raises ValidationError: if the inpuc is not a plain JSON object
if not isinstance(doc, dict):
raise ValidationError('Document is not a plain JSON object')
valid_keys = set(cls.CORE_SCHEMA['properties'].keys()) # type: ignore
doc2: RawDocument = {}
extras: Mapping[str, Any] = {}
for (key, val) in doc.items():
if key in valid_keys:
doc2[key] = val # type: ignore
if not key.startswith('_'):
raise ValidationError(
"extra document fields must have names starting with an underscore"
extras[key] = val # type: ignore
return (doc2, extras)
# These are broken out for easier unit testing. They will not normally be used directly; use
# the main assertDocumentIsValid instead.
def _check_basic_document_validity(cls, doc: Dict[str, Any]) -> None:
"""Check for correct fields and datatypes"""
except BaseException as exc:
raise ValidationError(exc) from None
# TODO: is there more to check?
def _check_timestamp_is_ok(cls, timestamp: int, delete_after: Optional[int]) -> None:
"""Check for valid timestamp, and expired ephemeral documents
timestamp and deleteAfter are already verified as good numbers by the schema checker:
- in the right range of min and max allowed timestamps
- integers, and not NaN or infinity
Timestamp must not be too far in the future (but allow some clock skew).
now = int(datetime.utcnow().timestamp() * 1000000)
if timestamp > now + cls.FUTURE_CUTOFF_MICROSECONDS:
raise ValidationError("timestamp too far in the future")
# Ephemeral documents
if delete_after is not None:
# Only valid if expiration date is in the future
if now > delete_after:
raise ValidationError("ephemeral doc has expired")
# Can't expire before it was created, that makes no sense
if delete_after <= timestamp:
raise ValidationError("ephemeral doc expired before it was created")
def validate_signature(self) -> None:
"""Check if the signature is good"""
doc_hash = self.generate_hash()
except BaseException as exc:
raise ValidationError(f'Cannot check signature: {exc}') from None
if not self.signature:
raise ValidationError('document has no signature assigned')
if not, self.signature):
raise ValidationError('signature is invalid')
def _check_content_matches_hash(content: str, content_hash: str) -> None:
"""Ensure the contentHash matches the actual content"""
hasher = sha256()
calculated_hash = base32_bytes_to_string(hasher.digest())
if content is not None and calculated_hash != content_hash:
raise ValidationError("content does not match contentHash")

@ -12,7 +12,7 @@ class Path:
"""A document path"""
_SEGMENT_PATTERN = f'/[{PATH_CHARACTER}]+'.replace('$', '\\$')
def __init__(self, path: str) -> None:
self.validate(path, allow_ephemeral=True)

@ -0,0 +1,274 @@
"""Tests for the es.4 document validator"""
from datetime import datetime, timedelta
from typing import Any, Dict
from ed25519 import SigningKey
import pytest
from pytest_mock.plugin import MockerFixture
from earthsnake.exc import ValidationError
from earthsnake.document.es4 import Es4Document
from earthsnake.identity import Identity
from earthsnake.path import Path
from earthsnake.share import Share
VALID_DOCUMENT: Dict[str, Any] = {
'format': 'es.4',
'timestamp': 1651738871668993,
'deleteAfter': None,
'author': '@test.bcz76z52y5dlpohtkmpuj3jsdcvfmebzpcgfmtmhu4u7hlexzreya',
'path': '/test.txt',
'workspace': '+test.suffix',
'signature': (
'content': 'test',
'contentHash': 'bt6dnbamijr6wlgrp5kqmkwwqcwr36ty3fmfyelgrlvwblmhqbiea',
Es4Document.CORE_SCHEMA['required'], # type: ignore
def test_validate_missing_keys(missing: str) -> None:
"""Test if validation fails when a required key is missing"""
raw_document = VALID_DOCUMENT.copy()
del raw_document[missing]
with pytest.raises(ValidationError) as ctx:
required_field_names = ', '.join(
repr(x) for x in Es4Document.CORE_SCHEMA['required'] # type: ignore
assert str(ctx.value) == f"data must contain [{required_field_names}] properties"
def test_validate_future_document() -> None:
"""Test if validation fails when the document is created far in the future"""
raw_document = VALID_DOCUMENT.copy()
raw_document['timestamp'] = int(
(datetime.utcnow() + timedelta(minutes=20)).timestamp() * 1000000
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'timestamp too far in the future'
def test_validate_expiry_before_creation() -> None:
"""Test if validation fails when the document is set to expire before it has been created"""
raw_document = VALID_DOCUMENT.copy()
raw_document['timestamp'] = int(
(datetime.utcnow() + timedelta(minutes=5)).timestamp() * 1000000
raw_document['deleteAfter'] = raw_document['timestamp'] - 1
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'ephemeral doc expired before it was created'
def test_validate_deleteafter_in_past() -> None:
"""Test if validation fails if deleteAfter is in the pas"""
raw_document = VALID_DOCUMENT.copy()
yesterday = datetime.utcnow() - timedelta(days=1)
raw_document['deleteAfter'] = int(yesterday.timestamp() * 1000000)
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'ephemeral doc has expired'
def test_validate_not_writable() -> None:
"""Test if validation fails when the author is not allowed to write at the given path"""
raw_document = VALID_DOCUMENT.copy()
raw_document['path'] = '/test/~@some.' + ('a' * 52)
with pytest.raises(ValidationError) as ctx:
assert (
== f'Author {raw_document["author"]} cannot write to path {raw_document["path"]}'
def test_validate() -> None:
"""Test if validation succeeds on a valid document"""
document = Es4Document.from_json(VALID_DOCUMENT)
assert isinstance(document, Es4Document)
def test_validate_ephemeral() -> None:
"""Test if validation succeeds with a valid deleteAfter value"""
raw_document = VALID_DOCUMENT.copy()
raw_document['deleteAfter'] = 9007199254740990
raw_document['signature'] = (
document = Es4Document.from_json(raw_document)
assert isinstance(document, Es4Document)
def test_signature_check_error(mocker: MockerFixture) -> None:
"""Test if validation fails if something goes wrong during signature checking"""
side_effect=ValueError('test error'),
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'Cannot check signature: test error'
def test_signature_check_bad_signature() -> None:
"""Test if validation fails if the document’s signature doesn’t match the document itself"""
raw_document = VALID_DOCUMENT.copy()
raw_document['signature'] = 'b' * 104
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'signature is invalid'
def test_remove_extra_fields_no_underscore() -> None:
"""Test if remove_extra_fields() fails on non-specced fields not starting with an underscore"""
raw_document = VALID_DOCUMENT.copy()
raw_document.update({'test': 'yes'})
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'extra document fields must have names starting with an underscore'
def test_remove_extra_fields_non_object() -> None:
"""Test if remove_extra_fields() fails if the validated document is not a JSON object"""
with pytest.raises(ValidationError) as ctx:
Es4Document.remove_extra_fields([]) # type: ignore
assert str(ctx.value) == 'Document is not a plain JSON object'
def test_remove_extra_fields() -> None:
"""Test if remove_extra_fields() correctly extracts field names starting with an underscore"""
raw_document = VALID_DOCUMENT.copy()
raw_document.update({'_test': True})
cleaned_doc, removed = Es4Document.remove_extra_fields(raw_document)
assert cleaned_doc == VALID_DOCUMENT
assert removed == {'_test': True}
def test_check_content_hash() -> None:
"""Test if validation fails if contentHash is not the hash of content"""
raw_document = VALID_DOCUMENT.copy()
raw_document['content'] = 'other'
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'content does not match contentHash'
def test_sign_different_author(identity: Identity) -> None:
"""Test if sign() fails if the signing identity is not the same as the document author"""
document = Es4Document.from_json(VALID_DOCUMENT)
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'when signing a document, keypair address must match document author'
def test_sign_new_identity() -> None:
"""Test if signing a document with a different entity also sets the author"""
key_seed = (
key = SigningKey(key_seed)
identity = Identity('name', sign_key=key)
document = Es4Document.from_json(VALID_DOCUMENT) = identity
assert (
== 'bk76oxkhbydy3itajeeqtryyj7ej5y7hqjffae6ilf4kpyklh2w32hx3ndg6cb3zvlphj46zmuxbetk4cj2fh6'
assert == identity
def test_sign(identity: Identity) -> None:
"""Test if the sign() method updates the document with a correct signature"""
document = Es4Document.from_json(VALID_DOCUMENT)
assert document.signature == (
def test_content_setter() -> None:
"""Check if the content setter recalculates content_hash, too"""
document = Es4Document.from_json(VALID_DOCUMENT)
document.content = 'new content'
assert document.content_hash != VALID_DOCUMENT['contentHash']
assert document.content_hash == 'b7yzgbde66w3m67r7srsiajj765xsj5hmaz4phuhqp6mejs77syaq'
def test_content_hash_getter() -> None:
"""Test if the content_hash getter recalculates the content hash if it’s not already set"""
document = Es4Document.from_json(VALID_DOCUMENT)
document._content = 'new content' # pylint: disable=protected-access
document._content_hash = None # pylint: disable=protected-access
assert document.content_hash == 'b7yzgbde66w3m67r7srsiajj765xsj5hmaz4phuhqp6mejs77syaq'
def test_validate_unsigned_document(identity: Identity) -> None:
"""Test if signature validation fails if there is no signature yet"""
document = Es4Document(identity, Share.from_address('+test.share'), Path('/test'))
with pytest.raises(ValidationError) as ctx:
assert str(ctx.value) == 'document has no signature assigned'