[Draft] Replica

This commit is contained in:
Gergely Polonkai 2022-05-03 15:27:54 +02:00
parent e125ef6cdb
commit 61b4270332
No known key found for this signature in database
GPG Key ID: 2D2885533B869ED4
2 changed files with 630 additions and 0 deletions

View File

@ -11,3 +11,7 @@ class EarthsnakeError(Exception):
class ValidationError(EarthsnakeError): class ValidationError(EarthsnakeError):
"""Raised when something doesnt pass as a valid Earthsnake object""" """Raised when something doesnt pass as a valid Earthsnake object"""
class ReplicaIsClosedError(EarthsnakeError):
"""A ReplicaBase or ReplicaDriverBase object was used after close() was called on it"""

626
earthsnake/replica.py Normal file
View File

@ -0,0 +1,626 @@
from abc import ABC, abstractmethod
from datetime import datetime
from enum import Enum, auto
import logging
from typing import List, Mapping, Literal, Optional, Tuple, Union
from .document import Cmp, Document
from .exc import EarthsnakeError, ReplicaIsClosedError, ValidationError
from .identity import Identity
from .path import Path
from .space import Space
logger = logging.getLogger(__name__)
def microsecond_now():
return int(datetime.utcnow().timestamp() * 1000)
class FormatValidatorBase:
pass
class IngestEvent:
FAILURE = 'failure'
NOTHING_HAPPENED = 'nothing_happened'
SUCCESS = 'success'
class ReplicaBase(ABC):
replica_id: str
share: Space
format_validator: FormatValidatorBase
bus: SuperBus[ReplicaBusChannel]
def is_closed(self) -> bool:
"""Returns whether the replica is closed or not"""
def close(self, erase: bool = False) -> None:
"""Closes the replica, preventing new documents from being ingested or events being emitted
Any methods called after closing will return ``ReplicaIsClosedError``.
More details:
- send ReplicaWillClose events and wait for event receivers to finish blocking
- close the Replica
- close the ReplicaDriver and possibly erase it
- send ReplicaDidClose events and do not wait for event receivers
Any function called after the replica is closed will throw a ``ReplicaIsClosedError``,
except ``isClosed()`` which is always allowed.
You cannot call ``close()`` if the replica is already closed (it will throw a
``ReplicaIsClosedError``)
``close()`` can happen while ``set()`` or ``ingest()`` are waiting for locks or have
pending transactions. In that case the pending operations will fail and throw a
``ReplicaIsClosed``.
If ``erase`` is ``True``, actually delete and forget the local data (remove files,
etc). It defaults to ``False`` if not provided.
:param erase: erase the contents of the replica
"""
def get_max_local_index(self) -> int:
"""Returns the max local index of all stored documents"""
# The following show always return frozen docs
def get_docs_after_local_index(
self,
history_mode: HistoryMode,
start_after: int,
limit: Optional[int]
) -> List[Document]:
pass
def get_all_docs(self) -> List[Document]:
"""Returns all documents, including historical versions of documents by other identities
"""
def get_latest_docs(self) -> List[Document]:
"""Returns latest document for every path"""
def get_all_docs_at_path(self, path: Path) -> List[Document]:
"""Returns all versions of a document by different authors from a specific path"""
def get_latest_doc_at_past(self, path: Path) -> Optional[Document]:
"""Returns the most recently written version of a document at a path"""
async def query_docs(self, query: Query) -> List[Document]:
"""Returns a list of documents for a given query
my_query = Query(filter={'pathEndsWith': '.txt'}, limit=5)
first_five_text_docs = await my_replica.query_docs(my_query)
"""
def query_paths(self, query: Optional[Query]) -> List[Document]:
pass
@abstractmethod
def query_authors(self, query: Optional[Query]) -> List[Identity]:
pass
@abstractmethod
def set(self, identity: Identity, doc_to_set: IncompleteDocument) -> None:
"""Adds a new document to the replica
If a document signed by the same identity exists at the same path, it will be
overwritten.
"""
@abstractmethod
def ingest(self, doc: Document) -> IngestEvent:
"""Ingest an existing signed document to the replica"""
@abstractmethod
def overwrite_all_docs_by_author(self, identity: Identity) -> int:
"""Overwrite every document from this author, including history versions, with an empty doc
The new docs will have a timestamp of ``old_document.timestamp + 1`` to prevent them
from jumping to the front of the history and becoming latest.
Documents that are already empty will not be overwritten.
If an error occurs, this will stop early.
:returns: the number of documents changed
:raises ValidationError:
"""
class ReplicaDriverBase(ABC):
"""Base class for replica drivers
A replica driver provides low-level access to actual replica and is used by ReplicaBase to
actually load and save data. ReplicaDrivers are not meant to be used directly by users; let
the Replica talk to it for you.
"""
share: Space
def is_closed(self) -> bool:
"""Returns whether the replica has been closed or not"""
def close(self, erase: bool = False) -> None:
"""Close the replica driver
The replica will call this.
You cannot call ``close()`` if the replica is already closed (it will throw a
``ReplicaIsClosedError``).
:param erase: if ``True``, actually delete and forget data locally.
"""
def get_max_local_index(self) -> int:
"""The max local index used so far
The first doc will increment this and get index 1.
This is synchronous because its expected that the driver will load it once at startup and
then keep it in memory.
"""
def query_docs(self, query: Query) -> List[Document]:
"""Returns a list of Documents given a Query"""
def upsert(self, doc: Document) -> Document:
"""Add or update a signed document
Do no checks of any kind, just save it to the indices.
Add a doc, dont enforce any rules on it.
Overwrite existing doc even if this doc is older.
:returns: a copy of the document, frozen, with ``local_index`` set.
"""
class Replica(ReplicaBase):
def __init__(
self, space: Union[str, Space], validator: FormatValidatorBase, driver: ReplicaDriverBase
) -> None:
self._is_closed = False
self._ingest_lock: Lock[IngestEvent]
if isinstance(space, str):
space = Space(space)
self.share = space
self.replica_id = 'replica-' + random_id()
self.format_validator = validator
self.bus = Superbus('|')
self._ingest_lock = Lock()
self.replica_driver = driver
def is_closed(self) -> bool:
return self.is_closed
def close(self, erase: bool = False) -> None:
if self._is_closed:
raise ReplicaIsClosedError()
self.bus.send_and_wait('will_close')
self._is_closed = True
self.replica_driver.close(erase)
self.bus.send_and_wait('did_close')
def get_config(self, key: str) -> Optional[str]:
if self._is_closed:
raise ReplicaIsClosedError()
return self.replica_driver.get_config(key)
def set_config(self, key: str, value: str) -> None:
if self._is_closed:
raise ReplicaIsClosedError()
self.replica_driver.set_config(key, value)
def list_config_keys(self) -> List[str]:
"""Get all available configuration keys"""
if self._is_closed:
raise ReplicaIsClosedError()
return self.replica_driver.list_config_keys()
def delete_config(self, key: str) -> bool:
"""Delete a key from the configuration"""
if self._is_closed:
raise ReplicaIsClosedError()
return self.replica_driver.delete_config(key)
def get_max_local_index(self) -> int:
if self._is_closed:
raise ReplicaIsClosedError()
return self.replica_driver.get_max_local_index()
def get_docs_after_local_index(
self, history_mode: HistoryMode, start_after: int, limit: Optional[int] = None
) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
query = Query(
history_mode=history_mode,
order_by=('local_index', 'ASC'),
start_after={'local_index': start_after},
limit=limit,
)
return self.replica_driver.query_docs(query)
def get_all_docs(self) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
query = Query(history_mode=HistoryMode.ALL, order_by=('path', 'ASC'))
return self.replica_driver.query_docs(query)
def get_latest_docs(self) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
query = Query(history_mode=HistoryMode.LATEST, order_by=('path', 'ASC'))
return self.replica_driver.query_docs(query)
def get_all_docs_at_path(self, path: Path) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
query = Query(history_mode=HistoryMode.ALL, order_by=('path', 'ASC'), filter={'path': path})
return self.replica_driver.query_docs(query)
def get_latest_doc_at_path(self, path: Path) -> Document:
if self._is_closed:
raise ReplicaIsClosedError()
query = Query(
history_mode=HistoryMode.LATEST, order_by=('path', 'ASC'), filter={'path': path}
)
docs = self.replica_driver.query_docs(query)
if not docs:
return None
return docs[0]
def query_docs(self, query: Query) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
return self.replica_driver.query_docs(query)
def set(self, identity: Identity, doc_to_set: IncompleteDocument) -> IngestEvent:
if self._is_closed:
raise ReplicaIsClosedError()
timestamp: int
if doc_to_set is not None:
timestamp = doc_to_set.timestamp
else:
latest_doc_same_path = self.get_latest_doc_at_path(doc_to_set.path)
if latest_doc_same_path is None:
timestamp = microsecond_now()
else:
timestamp = max(microsecond_now(), latest_doc_same_path.timestamp + 1)
doc = Document(
format='es.4',
author=identity,
content=doc_to_set.content,
content_hash=hash_doc(doc_to_set.content),
delete_after=doc_to_set.delete_after,
timestamp=timestamp,
space=self.share,
signature='?',
)
try:
signed_doc = self.format_validator.sign_document(identity, doc)
except EarthsnakeError as exc:
return IngestEvent(
kind=IngestEvent.FAILURE,
reason='invalid_document',
err=exc,
max_local_index=self.replica_driver.get_max_local_index(),
)
return self.ingest(signed_document)
def ingest(self, doc_to_ingest: Document) -> IngestEvent:
if self._is_closed:
raise ReplicaIsClosedError()
try:
doc_to_ingest, extra_fields = self.format_validator.remove_extra_fields(doc_to_ingest)
except EarthsnakeError as exc:
return IngestEvent(
kind=IngestEvent.FAILURE,
reason='invalid_document',
err=exc,
max_local_index=self.replica_driver.get_max_local_index(),
)
if extra_fields:
logger.debug('extra fields found: {extra_fields}')
try:
self.format_validator.check_document_is_valid(doc_to_ingest)
except EarthsnakeError as exc:
return IngestEvent(
kind=IngestEvent.FAILURE,
reason='invalid_document',
err=exc,
max_local_index=self.replica_driver.get_max_local_index(),
)
def write_to_driver_with_lock() -> IngestEvent:
existing_docs_same_path = self.get_all_docs_at_path(doc_to_ingest.path)
prev_latest = existing_docs_same_path[0] if existing_docs_same_path else None
prev_same_author_list = [
doc for doc in existing_docs_same_path if doc.author == doc_to_ingest.author
]
prev_same_author = prev_same_author_list[0] if prev_same_author_list else None
existing_docs_same_path.append(doc_to_ingest)
existing_docs_same_path.sort(key=lambda doc: (-doc.timestamp, doc.signature))
is_latest = existing_docs_same_path[0] == doc_to_ingest
if not is_latest and prev_same_author:
doc_comp = Document.compare_newest_first(doc_to_ingest, prev_same_author)
if doc_comp == Cmp.GT:
return IngestEvent(
kind=IngestEvent.NOTHING_HAPPENED,
reason='obsolete_from_same_author',
doc=doc_to_ingest,
max_local_index=self.replica_driver.get_max_local_index(),
)
if doc_comp == Cmp.EQ:
return IngestEvent(
kind=IngestEvent.NOTHING_HAPPENED,
reason='already_had_it',
doc=doc_to_ingest,
max_local_index=self.replica_driver.get_max_local_index(),
)
# TODO: pass existingDocsSamePath to save another lookup
doc_as_written = self.replica_driver.upsert(doc_to_ingest)
max_local_index = self.replica_driver.get_max_local_index()
return IngestEvent(
kind=IngestEvent.SUCCESS,
max_local_index=max_local_index,
doc_is_latest=is_latest,
prev_doc_from_same_author=prev_same_author,
prev_latest_doc=prev_latest,
)
ingest_event = self._ingest_lock.run(write_to_driver_with_lock)
self.bus.send_and_wait(f'ingest|{doc_to_ingest.path}', ingest_event)
return ingest_event
def overwrite_all_docs_by_author(self, identity: Identity) -> int:
if self._is_closed:
raise ReplicaIsClosedError()
# TODO: do this in batches
query = Query(history_mode=HistoryMode.ALL, filter={'author': identity})
docs_to_overwrite = self.query_docs(query)
num_overwritten = 0
num_already_empty = 0
for doc in docs_to_overwrite:
if not doc.content:
num_already_empty += 1
continue
self.format_validator.remove_extra_fields(doc)
empty_doc = Document(
content='', content_hash=hash_doc(''), timestamp=doc.timestamp + 1, signature='?'
)
signed_doc = self.format_validator.sign_document(identity, empty_doc)
ingest_event = self.ingest(signed_doc)
if ingest_event.kind == IngestEvent.FAILURE:
raise ValidationError(
'ingestion error during overwrite_all_docs_by_same_author '
f'{ingest_event.reason}: {ingest_event.err}'
)
if ingest_event.kind == IngestEvent.NOTHING_HAPPENED:
raise ValidationError(
'ingestion did nothing during overwrite_all_docs_by_same_author: '
f'{ingest_event.reason}'
)
num_overwritter += 1
return num_overwritten
class ReplicaDriverMemory(ReplicaDriverBase):
def __init__(self, space: Space) -> None:
self._is_closed = False
self._max_local_index = -1
self._config_kv: Mapping[str, str] = {}
self.space = space
self.doc_by_path_and_author: Mapping[str, Document] = {}
self.docs_by_path_newest_first: Mapping[Path, List[Document]] = {}
def is_closed(self) -> bool:
return self._is_closed
def close(self, erase: bool = False) -> None:
if self._is_closed:
raise ReplicaIsClosedError()
if erase:
self._config_kv = {}
self._max_local_index = -1
self.docs_by_path_newest_first.clear()
self.doc_by_path_and_author.clear()
self._is_closed = True
def get_config(self, key: str) -> Optional[str]:
if self._is_closed:
raise ReplicaIsClosedError()
return self._config_kv.get(key)
def set_config(self, key: str, value: str) -> None:
if self._is_closed:
raise ReplicaIsClosedError()
self._config_kv[key] = value
def list_config_keys(self) -> List[str]:
if self._is_closed:
raise ReplicaIsClosedError()
return sorted(self._config_kv)
def delete_config(self, key: str) -> bool:
if self._is_closed:
raise ReplicaIsClosedError()
had = (key in self._config_kv)
if had:
del self._config_kv[key]
return had
def get_max_local_index(self):
if self._is_closed:
raise ReplicaIsClosedError()
return self._max_local_index
def _get_all_docs(self) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
return self.doc_by_path_and_author.values()
def _get_latest_docs(self) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
return [doc_list[0] for doc_list in self.docs_by_path_newest_first.values()]
def query_docs(self, query: Query) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
will_match = query.pre_evaluate()
if will_match == Query.MATCH_NOTHING:
return []
docs = self._get_all_docs() if query.history_mode == HistoryMode.ALL else self._get_latest_docs()
if query.order_by == ('path', 'ASC'):
docs.sort(key=lambda doc: (doc.path, -doc.timestamp, doc.signature))
elif query.order_by == ('path', 'DESC'):
# Unfortunately, there is no way to sort by a string value in a descending order
docs.sort(key=lambda doc: doc.signature)
docs.sort(key=lambda doc: doc.timestamp, reverse=True)
docs.sort(key=lambda doc: doc.path, revers=True)
elif query.order_by == ('local_index', 'ASC'):
docs.sort(key=lambda doc: doc._local_index)
elif query.order_by == ('local_index', 'DESC'):
docs.sort(key=lambda doc: doc._lotal_index, reverse=True)
else:
raise ValidationError(f'unrecognized query order {", ".join(query.order_by)}')
filtered_docs: List[Document] = []
for doc in docs:
if (
query.order_by == ('path', 'ASC')
and query.start_after
and query.start_after.path and doc.path <= query.start_after.path
):
continue
if (
query.order_by == ('path', 'DESC')
and query.start_after
and query.start_after.path and doc.path >= query.start_after.path
):
continue
if (
query.order_by == ('local_index', 'ASC')
and query.start_after
and query.start_after.local_index is not None
and (doc._local_index or 0) <= query.start_after.local_index
):
continue
if (
query.order_by == ('local_index', 'DESC')
and query.start_after
and query.start_after.local_index is not None
and (doc._local_index or 0) >= query.start_after.local_index
):
continue
if query.filter and not query.match_doc(doc):
continue
filtered_docs.append(doc)
if query.limit is not None and len(filtered_docs) >= query.limit:
break
return filtered_docs
def upsert(self, document: Document) -> Document:
if self._is_closed:
raise ReplicaIsClosedError()
document = document.copy()
self._max_local_index += 1
document._local_index = self._max_local_index
document.freeze()
self.doc_by_path_and_author[f'{document.path}|{document.author}'] = document
docs_by_path = [
doc
for doc in self.docs_by_path_newest_first.get(document.path, [])
if document.author != doc.author
]
docs_by_path.append(document)
docs_by_path.sort(key=lambda doc: (doc.path, -doc.timestamp, doc.signature))
self.docs_by_path_newest_first[document.path] = docs_by_path
return document