[Draft] in-memory replica

This commit is contained in:
Gergely Polonkai 2022-05-06 14:11:28 +02:00
parent 9517f53473
commit a6c3d1f029
No known key found for this signature in database
GPG Key ID: 2D2885533B869ED4
2 changed files with 94 additions and 1 deletions

View File

@ -1,6 +1,7 @@
"""Replica related things"""
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import List, Optional
from ..document import Document
@ -10,6 +11,11 @@ from ..query import Query
from ..share import Share
class HistoryMode(Enum):
ALL = auto()
LATEST = auto()
class IngestEvent:
pass
@ -44,7 +50,12 @@ class Replica(ABC):
"""Returns the maximum local index of all stored documents"""
@abstractmethod
def get_docs_after_local_index(self) -> List[Document]:
def get_docs_after_local_index(
self,
history_mode: HistoryMode,
start_after: int,
limit: Optional[int] = None,
) -> List[Document]:
"""Get all documents after a specific local index"""
@abstractmethod

View File

@ -0,0 +1,82 @@
from typing import Dict, List, Tuple
from ..document import Document
from ..exc import ReplicaIsClosedError
from ..identity import Identity
from ..query import HistoryMode, Query
from ..path import Path
from ..share import Share
from . import Replica
class InMemoryReplica(Replica):
def __init__(self, share: Share, **driver_kwargs):
self.share = share
self._is_closed = False
self._max_local_index = -1
# Local Index <=> Document pairs
self._documents: List[Tuple[int, Document]] = {}
@property
def is_closed(self):
return self._is_closed
def close(self, erase: bool = False):
if erase:
self._local_index = -1
self._documents = []
self._is_closed = True
@property
def max_local_index(self) -> int:
return self._max_local_index
def _get_all_docs(self) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
return [document for _, document in self._documents]
def _get_latest_docs(self) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
docs_by_path: Dict[str, Document] = {}
for document in self._documents:
if (
str(document.path) not in docs_by_path
or docs_by_path[str(document.path)].timestamp <= document.timestamp
):
docs_by_path[str(document.path)] = document
return list(docs_by_path.values())
def query_docs(self, query: Query) -> List[Document]:
if self._is_closed:
raise ReplicaIsClosedError()
if query.history_mode == HistoryMode.ALL:
docs = self._get_all_docs()
else:
docs = self._get_latest_docs()
docs_to_local_index = {
document: local_index for document, local_index in self.docs_by_local_index
}
return query({docs_to_local_index[document]: document for document in docs})
def upsert(self, new_document: Document) -> None:
if self._is_closed:
raise ReplicaIsClosedError()
self._local_index += 1
self._documents = [
(local_index, document)
for local_index, document in self._documents
if document.author != new_document.author or document.path != new_document.path
]
self._documents.append((self._local_index, new_document))