"""Replica related things""" from abc import ABC, abstractmethod from enum import Enum, auto from typing import List, Optional from ..document import Document from ..identity import Identity from ..path import Path from ..query import Query from ..share import Share class HistoryMode(Enum): ALL = auto() LATEST = auto() class IngestEvent: pass class Replica(ABC): """A replica of a share’s data Used to read, write, and synchronise data to. Should be closed using the ``close()`` method when no longer being used. """ def __init__(self, share: Share, **driver_kwargs): pass @property @abstractmethod def is_closed(self): """Tells whether the replica is closed or not """ @abstractmethod def close(self): """Closes the replica, preventing further use Any method called after closing will return ``ReplicaIsClosedError`` """ @property @abstractmethod def max_local_index(self) -> int: """Returns the maximum local index of all stored documents""" @abstractmethod def get_docs_after_local_index( self, history_mode: HistoryMode, start_after: int, limit: Optional[int] = None, ) -> List[Document]: """Get all documents after a specific local index""" @abstractmethod def get_all_docs(self) -> List[Document]: """Get all documents, including historical versions by other identities""" @abstractmethod def get_latest_docs(self) -> List[Document]: """Get the latest from every path""" @abstractmethod def get_all_docs_at_path(self, path: Path) -> List[Document]: """Get all versions of a document by different authors from a specific path""" @abstractmethod def get_latest_doc_at_path(self, path: Path) -> Optional[Document]: """Get the most recent version of a document at a specific path""" @abstractmethod def query_docs(self, query: Query) -> List[Document]: """Get a list of documents matching a given query""" @abstractmethod def query_paths(self, query: Query) -> List[Path]: """Get all document paths where documents match a given query""" @abstractmethod def query_authors(self, query: Query) -> List[Identity]: """Get all document authors where documents match a given query""" @abstractmethod def set(self, doc_to_set: Document) -> IngestEvent: """Add a new document to the replica If a document signed by the same identity exists at the same path, it will be overwritten. """ @abstractmethod def ingest(self, document: Document) -> IngestEvent: """Ingest an existing, signed document to the replica""" @abstractmethod def overwrite_docs_by_author(self, identity: Identity) -> int: """Overwrite every document from this author with an empty doc This includes historical versions of documents. :returns: the number of documents changed. """ """Workspace drivers""" from abc import ABC, ABCMeta from typing import List, Optional from ...document import Document from .. import Workspace from ...query import Query class WorkspaceDriverConfig(ABC): """Base class for configurable workspace drivers""" def get_config(self, key: str) -> Optional[str]: """Get a configuration value""" raise NotImplementedError() def set_config(self, key: str, value: str) -> None: """Set a configuration value""" raise NotImplementedError() def list_config_keys(self) -> List[str]: """List all configuration keys""" raise NotImplementedError() def delete_config(self, key: str) -> bool: """Delete a configuration value""" raise NotImplementedError() class WorkspaceDriverBase(WorkspaceDriverConfig, metaclass=ABCMeta): """Base class for workspace drivers""" workspace: Workspace @property def is_closed(self) -> bool: """Tells if a workspace is closed""" raise NotImplementedError() def close(self, erase: bool) -> None: """Close the workspace""" raise NotImplementedError() def get_max_local_index(self) -> int: """Get the maximum local index count""" raise NotImplementedError() def query_docs(self, query: Query) -> List[Document]: """Query a list of documents""" raise NotImplementedError() def upsert(self, doc: Document) -> Document: """Insert or update a document""" raise NotImplementedError()