From a6c3d1f029333fc08fbe70c49b1846ea9a7ad674 Mon Sep 17 00:00:00 2001 From: Gergely Polonkai Date: Fri, 6 May 2022 14:11:28 +0200 Subject: [PATCH] [Draft] in-memory replica --- earthsnake/replica/__init__.py | 13 +++++- earthsnake/replica/memory.py | 82 ++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) create mode 100644 earthsnake/replica/memory.py diff --git a/earthsnake/replica/__init__.py b/earthsnake/replica/__init__.py index 23bba6e..a36a689 100644 --- a/earthsnake/replica/__init__.py +++ b/earthsnake/replica/__init__.py @@ -1,6 +1,7 @@ """Replica related things""" from abc import ABC, abstractmethod +from enum import Enum, auto from typing import List, Optional from ..document import Document @@ -10,6 +11,11 @@ from ..query import Query from ..share import Share +class HistoryMode(Enum): + ALL = auto() + LATEST = auto() + + class IngestEvent: pass @@ -44,7 +50,12 @@ class Replica(ABC): """Returns the maximum local index of all stored documents""" @abstractmethod - def get_docs_after_local_index(self) -> List[Document]: + def get_docs_after_local_index( + self, + history_mode: HistoryMode, + start_after: int, + limit: Optional[int] = None, + ) -> List[Document]: """Get all documents after a specific local index""" @abstractmethod diff --git a/earthsnake/replica/memory.py b/earthsnake/replica/memory.py new file mode 100644 index 0000000..86495bd --- /dev/null +++ b/earthsnake/replica/memory.py @@ -0,0 +1,82 @@ +from typing import Dict, List, Tuple + +from ..document import Document +from ..exc import ReplicaIsClosedError +from ..identity import Identity +from ..query import HistoryMode, Query +from ..path import Path +from ..share import Share +from . import Replica + + +class InMemoryReplica(Replica): + def __init__(self, share: Share, **driver_kwargs): + self.share = share + self._is_closed = False + self._max_local_index = -1 + # Local Index <=> Document pairs + self._documents: List[Tuple[int, Document]] = {} + + @property + def is_closed(self): + return self._is_closed + + def close(self, erase: bool = False): + if erase: + self._local_index = -1 + self._documents = [] + + self._is_closed = True + + @property + def max_local_index(self) -> int: + return self._max_local_index + + def _get_all_docs(self) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + return [document for _, document in self._documents] + + def _get_latest_docs(self) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + docs_by_path: Dict[str, Document] = {} + + for document in self._documents: + if ( + str(document.path) not in docs_by_path + or docs_by_path[str(document.path)].timestamp <= document.timestamp + ): + docs_by_path[str(document.path)] = document + + return list(docs_by_path.values()) + + def query_docs(self, query: Query) -> List[Document]: + if self._is_closed: + raise ReplicaIsClosedError() + + if query.history_mode == HistoryMode.ALL: + docs = self._get_all_docs() + else: + docs = self._get_latest_docs() + + docs_to_local_index = { + document: local_index for document, local_index in self.docs_by_local_index + } + + return query({docs_to_local_index[document]: document for document in docs}) + + def upsert(self, new_document: Document) -> None: + if self._is_closed: + raise ReplicaIsClosedError() + + self._local_index += 1 + + self._documents = [ + (local_index, document) + for local_index, document in self._documents + if document.author != new_document.author or document.path != new_document.path + ] + self._documents.append((self._local_index, new_document))