From 61b4270332ab399fa8b418b91cc68a3d7d7e3d34 Mon Sep 17 00:00:00 2001 From: Gergely Polonkai Date: Tue, 3 May 2022 15:27:54 +0200 Subject: [PATCH] [Draft] Replica --- earthsnake/exc.py | 4 + earthsnake/replica.py | 626 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 630 insertions(+) create mode 100644 earthsnake/replica.py diff --git a/earthsnake/exc.py b/earthsnake/exc.py index 0c88cc3..e1311f5 100644 --- a/earthsnake/exc.py +++ b/earthsnake/exc.py @@ -11,3 +11,7 @@ class EarthsnakeError(Exception): class ValidationError(EarthsnakeError): """Raised when something doesn’t pass as a valid Earthsnake object""" + + +class ReplicaIsClosedError(EarthsnakeError): + """A ReplicaBase or ReplicaDriverBase object was used after close() was called on it""" diff --git a/earthsnake/replica.py b/earthsnake/replica.py new file mode 100644 index 0000000..eaae0df --- /dev/null +++ b/earthsnake/replica.py @@ -0,0 +1,626 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from enum import Enum, auto +import logging +from typing import List, Mapping, Literal, Optional, Tuple, Union + +from .document import Cmp, Document +from .exc import EarthsnakeError, ReplicaIsClosedError, ValidationError +from .identity import Identity +from .path import Path +from .space import Space + +logger = logging.getLogger(__name__) + + +def microsecond_now(): + return int(datetime.utcnow().timestamp() * 1000) + + +class FormatValidatorBase: + pass + + +class IngestEvent: + FAILURE = 'failure' + NOTHING_HAPPENED = 'nothing_happened' + SUCCESS = 'success' + + +class ReplicaBase(ABC): + replica_id: str + share: Space + format_validator: FormatValidatorBase + bus: SuperBus[ReplicaBusChannel] + + def is_closed(self) -> bool: + """Returns whether the replica is closed or not""" + + def close(self, erase: bool = False) -> None: + """Closes the replica, preventing new documents from being ingested or events being emitted + + Any methods called after closing will return ``ReplicaIsClosedError``. + + More details: + + - send ReplicaWillClose events and wait for event receivers to finish blocking + - close the Replica + - close the ReplicaDriver and possibly erase it + - send ReplicaDidClose events and do not wait for event receivers + + Any function called after the replica is closed will throw a ``ReplicaIsClosedError``, + except ``isClosed()`` which is always allowed. + + You cannot call ``close()`` if the replica is already closed (it will throw a + ``ReplicaIsClosedError``) + + ``close()`` can happen while ``set()`` or ``ingest()`` are waiting for locks or have + pending transactions. In that case the pending operations will fail and throw a + ``ReplicaIsClosed``. + + If ``erase`` is ``True``, actually delete and forget the local data (remove files, + etc). It defaults to ``False`` if not provided. + + :param erase: erase the contents of the replica + """ + + def get_max_local_index(self) -> int: + """Returns the max local index of all stored documents""" + + # The following show always return frozen docs + + def get_docs_after_local_index( + self, + history_mode: HistoryMode, + start_after: int, + limit: Optional[int] + ) -> List[Document]: + pass + + def get_all_docs(self) -> List[Document]: + """Returns all documents, including historical versions of documents by other identities + """ + + def get_latest_docs(self) -> List[Document]: + """Returns latest document for every path""" + + def get_all_docs_at_path(self, path: Path) -> List[Document]: + """Returns all versions of a document by different authors from a specific path""" + + def get_latest_doc_at_past(self, path: Path) -> Optional[Document]: + """Returns the most recently written version of a document at a path""" + + async def query_docs(self, query: Query) -> List[Document]: + """Returns a list of documents for a given query + + my_query = Query(filter={'pathEndsWith': '.txt'}, limit=5) + first_five_text_docs = await my_replica.query_docs(my_query) + """ + + def query_paths(self, query: Optional[Query]) -> List[Document]: + pass + + @abstractmethod + def query_authors(self, query: Optional[Query]) -> List[Identity]: + pass + + @abstractmethod + def set(self, identity: Identity, doc_to_set: IncompleteDocument) -> None: + """Adds a new document to the replica + + If a document signed by the same identity exists at the same path, it will be + overwritten. + """ + + @abstractmethod + def ingest(self, doc: Document) -> IngestEvent: + """Ingest an existing signed document to the replica""" + + @abstractmethod + def overwrite_all_docs_by_author(self, identity: Identity) -> int: + """Overwrite every document from this author, including history versions, with an empty doc + + The new docs will have a timestamp of ``old_document.timestamp + 1`` to prevent them + from jumping to the front of the history and becoming latest. + + Documents that are already empty will not be overwritten. + + If an error occurs, this will stop early. + + :returns: the number of documents changed + :raises ValidationError: + """ + +class ReplicaDriverBase(ABC): + """Base class for replica drivers + + A replica driver provides low-level access to actual replica and is used by ReplicaBase to + actually load and save data. ReplicaDrivers are not meant to be used directly by users; let + the Replica talk to it for you. + """ + + share: Space + + def is_closed(self) -> bool: + """Returns whether the replica has been closed or not""" + + def close(self, erase: bool = False) -> None: + """Close the replica driver + + The replica will call this. + + You cannot call ``close()`` if the replica is already closed (it will throw a + ``ReplicaIsClosedError``). + + :param erase: if ``True``, actually delete and forget data locally. + """ + + def get_max_local_index(self) -> int: + """The max local index used so far + + The first doc will increment this and get index 1. + + This is synchronous because it’s expected that the driver will load it once at startup and + then keep it in memory. + """ + + def query_docs(self, query: Query) -> List[Document]: + """Returns a list of Documents given a Query""" + + def upsert(self, doc: Document) -> Document: + """Add or update a signed document + + Do no checks of any kind, just save it to the indices. + + Add a doc, don’t enforce any rules on it. + + Overwrite existing doc even if this doc is older. + + :returns: a copy of the document, frozen, with ``local_index`` set. + """ + + +class Replica(ReplicaBase): + def __init__( + self, space: Union[str, Space], validator: FormatValidatorBase, driver: ReplicaDriverBase + ) -> None: + self._is_closed = False + self._ingest_lock: Lock[IngestEvent] + + if isinstance(space, str): + space = Space(space) + + self.share = space + self.replica_id = 'replica-' + random_id() + self.format_validator = validator + self.bus = Superbus('|') + self._ingest_lock = Lock() + self.replica_driver = driver + + def is_closed(self) -> bool: + return self.is_closed + + def close(self, erase: bool = False) -> None: + if self._is_closed: + raise ReplicaIsClosedError() + + self.bus.send_and_wait('will_close') + self._is_closed = True + self.replica_driver.close(erase) + self.bus.send_and_wait('did_close') + + def get_config(self, key: str) -> Optional[str]: + if self._is_closed: + raise ReplicaIsClosedError() + + return self.replica_driver.get_config(key) + + def set_config(self, key: str, value: str) -> None: + if self._is_closed: + raise ReplicaIsClosedError() + + self.replica_driver.set_config(key, value) + + def list_config_keys(self) -> List[str]: + """Get all available configuration keys""" + + if self._is_closed: + raise ReplicaIsClosedError() + + return self.replica_driver.list_config_keys() + + def delete_config(self, key: str) -> bool: + """Delete a key from the configuration""" + + if self._is_closed: + raise ReplicaIsClosedError() + + return self.replica_driver.delete_config(key) + + def get_max_local_index(self) -> int: + if self._is_closed: + raise ReplicaIsClosedError() + + return self.replica_driver.get_max_local_index() + + def get_docs_after_local_index( + self, history_mode: HistoryMode, start_after: int, limit: Optional[int] = None + ) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + query = Query( + history_mode=history_mode, + order_by=('local_index', 'ASC'), + start_after={'local_index': start_after}, + limit=limit, + ) + + return self.replica_driver.query_docs(query) + + def get_all_docs(self) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + query = Query(history_mode=HistoryMode.ALL, order_by=('path', 'ASC')) + + return self.replica_driver.query_docs(query) + + def get_latest_docs(self) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + query = Query(history_mode=HistoryMode.LATEST, order_by=('path', 'ASC')) + + return self.replica_driver.query_docs(query) + + def get_all_docs_at_path(self, path: Path) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + query = Query(history_mode=HistoryMode.ALL, order_by=('path', 'ASC'), filter={'path': path}) + + return self.replica_driver.query_docs(query) + + def get_latest_doc_at_path(self, path: Path) -> Document: + if self._is_closed: + raise ReplicaIsClosedError() + + query = Query( + history_mode=HistoryMode.LATEST, order_by=('path', 'ASC'), filter={'path': path} + ) + + docs = self.replica_driver.query_docs(query) + + if not docs: + return None + + return docs[0] + + def query_docs(self, query: Query) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + return self.replica_driver.query_docs(query) + + def set(self, identity: Identity, doc_to_set: IncompleteDocument) -> IngestEvent: + if self._is_closed: + raise ReplicaIsClosedError() + + timestamp: int + + if doc_to_set is not None: + timestamp = doc_to_set.timestamp + else: + latest_doc_same_path = self.get_latest_doc_at_path(doc_to_set.path) + + if latest_doc_same_path is None: + timestamp = microsecond_now() + else: + timestamp = max(microsecond_now(), latest_doc_same_path.timestamp + 1) + + doc = Document( + format='es.4', + author=identity, + content=doc_to_set.content, + content_hash=hash_doc(doc_to_set.content), + delete_after=doc_to_set.delete_after, + timestamp=timestamp, + space=self.share, + signature='?', + ) + + try: + signed_doc = self.format_validator.sign_document(identity, doc) + except EarthsnakeError as exc: + return IngestEvent( + kind=IngestEvent.FAILURE, + reason='invalid_document', + err=exc, + max_local_index=self.replica_driver.get_max_local_index(), + ) + + return self.ingest(signed_document) + + def ingest(self, doc_to_ingest: Document) -> IngestEvent: + if self._is_closed: + raise ReplicaIsClosedError() + + try: + doc_to_ingest, extra_fields = self.format_validator.remove_extra_fields(doc_to_ingest) + except EarthsnakeError as exc: + return IngestEvent( + kind=IngestEvent.FAILURE, + reason='invalid_document', + err=exc, + max_local_index=self.replica_driver.get_max_local_index(), + ) + + if extra_fields: + logger.debug('extra fields found: {extra_fields}') + + try: + self.format_validator.check_document_is_valid(doc_to_ingest) + except EarthsnakeError as exc: + return IngestEvent( + kind=IngestEvent.FAILURE, + reason='invalid_document', + err=exc, + max_local_index=self.replica_driver.get_max_local_index(), + ) + + def write_to_driver_with_lock() -> IngestEvent: + existing_docs_same_path = self.get_all_docs_at_path(doc_to_ingest.path) + prev_latest = existing_docs_same_path[0] if existing_docs_same_path else None + prev_same_author_list = [ + doc for doc in existing_docs_same_path if doc.author == doc_to_ingest.author + ] + prev_same_author = prev_same_author_list[0] if prev_same_author_list else None + + existing_docs_same_path.append(doc_to_ingest) + existing_docs_same_path.sort(key=lambda doc: (-doc.timestamp, doc.signature)) + + is_latest = existing_docs_same_path[0] == doc_to_ingest + + if not is_latest and prev_same_author: + doc_comp = Document.compare_newest_first(doc_to_ingest, prev_same_author) + + if doc_comp == Cmp.GT: + return IngestEvent( + kind=IngestEvent.NOTHING_HAPPENED, + reason='obsolete_from_same_author', + doc=doc_to_ingest, + max_local_index=self.replica_driver.get_max_local_index(), + ) + + if doc_comp == Cmp.EQ: + return IngestEvent( + kind=IngestEvent.NOTHING_HAPPENED, + reason='already_had_it', + doc=doc_to_ingest, + max_local_index=self.replica_driver.get_max_local_index(), + ) + + # TODO: pass existingDocsSamePath to save another lookup + doc_as_written = self.replica_driver.upsert(doc_to_ingest) + max_local_index = self.replica_driver.get_max_local_index() + + return IngestEvent( + kind=IngestEvent.SUCCESS, + max_local_index=max_local_index, + doc_is_latest=is_latest, + prev_doc_from_same_author=prev_same_author, + prev_latest_doc=prev_latest, + ) + + ingest_event = self._ingest_lock.run(write_to_driver_with_lock) + + self.bus.send_and_wait(f'ingest|{doc_to_ingest.path}', ingest_event) + + return ingest_event + + def overwrite_all_docs_by_author(self, identity: Identity) -> int: + if self._is_closed: + raise ReplicaIsClosedError() + + # TODO: do this in batches + + query = Query(history_mode=HistoryMode.ALL, filter={'author': identity}) + docs_to_overwrite = self.query_docs(query) + num_overwritten = 0 + num_already_empty = 0 + + for doc in docs_to_overwrite: + if not doc.content: + num_already_empty += 1 + + continue + + self.format_validator.remove_extra_fields(doc) + + empty_doc = Document( + content='', content_hash=hash_doc(''), timestamp=doc.timestamp + 1, signature='?' + ) + signed_doc = self.format_validator.sign_document(identity, empty_doc) + + ingest_event = self.ingest(signed_doc) + + if ingest_event.kind == IngestEvent.FAILURE: + raise ValidationError( + 'ingestion error during overwrite_all_docs_by_same_author ' + f'{ingest_event.reason}: {ingest_event.err}' + ) + + if ingest_event.kind == IngestEvent.NOTHING_HAPPENED: + raise ValidationError( + 'ingestion did nothing during overwrite_all_docs_by_same_author: ' + f'{ingest_event.reason}' + ) + + num_overwritter += 1 + + return num_overwritten + + +class ReplicaDriverMemory(ReplicaDriverBase): + def __init__(self, space: Space) -> None: + self._is_closed = False + self._max_local_index = -1 + self._config_kv: Mapping[str, str] = {} + self.space = space + self.doc_by_path_and_author: Mapping[str, Document] = {} + self.docs_by_path_newest_first: Mapping[Path, List[Document]] = {} + + def is_closed(self) -> bool: + return self._is_closed + + def close(self, erase: bool = False) -> None: + if self._is_closed: + raise ReplicaIsClosedError() + + if erase: + self._config_kv = {} + self._max_local_index = -1 + self.docs_by_path_newest_first.clear() + self.doc_by_path_and_author.clear() + + self._is_closed = True + + def get_config(self, key: str) -> Optional[str]: + if self._is_closed: + raise ReplicaIsClosedError() + + return self._config_kv.get(key) + + def set_config(self, key: str, value: str) -> None: + if self._is_closed: + raise ReplicaIsClosedError() + + self._config_kv[key] = value + + def list_config_keys(self) -> List[str]: + if self._is_closed: + raise ReplicaIsClosedError() + + return sorted(self._config_kv) + + def delete_config(self, key: str) -> bool: + if self._is_closed: + raise ReplicaIsClosedError() + + had = (key in self._config_kv) + + if had: + del self._config_kv[key] + + return had + + def get_max_local_index(self): + if self._is_closed: + raise ReplicaIsClosedError() + + return self._max_local_index + + def _get_all_docs(self) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + return self.doc_by_path_and_author.values() + + def _get_latest_docs(self) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + return [doc_list[0] for doc_list in self.docs_by_path_newest_first.values()] + + def query_docs(self, query: Query) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + will_match = query.pre_evaluate() + + if will_match == Query.MATCH_NOTHING: + return [] + + docs = self._get_all_docs() if query.history_mode == HistoryMode.ALL else self._get_latest_docs() + + if query.order_by == ('path', 'ASC'): + docs.sort(key=lambda doc: (doc.path, -doc.timestamp, doc.signature)) + elif query.order_by == ('path', 'DESC'): + # Unfortunately, there is no way to sort by a string value in a descending order + docs.sort(key=lambda doc: doc.signature) + docs.sort(key=lambda doc: doc.timestamp, reverse=True) + docs.sort(key=lambda doc: doc.path, revers=True) + elif query.order_by == ('local_index', 'ASC'): + docs.sort(key=lambda doc: doc._local_index) + elif query.order_by == ('local_index', 'DESC'): + docs.sort(key=lambda doc: doc._lotal_index, reverse=True) + else: + raise ValidationError(f'unrecognized query order {", ".join(query.order_by)}') + + filtered_docs: List[Document] = [] + + for doc in docs: + if ( + query.order_by == ('path', 'ASC') + and query.start_after + and query.start_after.path and doc.path <= query.start_after.path + ): + continue + + if ( + query.order_by == ('path', 'DESC') + and query.start_after + and query.start_after.path and doc.path >= query.start_after.path + ): + continue + + if ( + query.order_by == ('local_index', 'ASC') + and query.start_after + and query.start_after.local_index is not None + and (doc._local_index or 0) <= query.start_after.local_index + ): + continue + + if ( + query.order_by == ('local_index', 'DESC') + and query.start_after + and query.start_after.local_index is not None + and (doc._local_index or 0) >= query.start_after.local_index + ): + continue + + if query.filter and not query.match_doc(doc): + continue + + filtered_docs.append(doc) + + if query.limit is not None and len(filtered_docs) >= query.limit: + break + + return filtered_docs + + def upsert(self, document: Document) -> Document: + if self._is_closed: + raise ReplicaIsClosedError() + + document = document.copy() + + self._max_local_index += 1 + document._local_index = self._max_local_index + document.freeze() + + self.doc_by_path_and_author[f'{document.path}|{document.author}'] = document + + docs_by_path = [ + doc + for doc in self.docs_by_path_newest_first.get(document.path, []) + if document.author != doc.author + ] + docs_by_path.append(document) + docs_by_path.sort(key=lambda doc: (doc.path, -doc.timestamp, doc.signature)) + + self.docs_by_path_newest_first[document.path] = docs_by_path + + return document