"""Document querying for replicas""" from datetime import datetime from enum import Enum, auto from typing import Any, Dict, List, Optional, TypedDict from .document import Document from .exc import ValidationError from .identity import Identity from .path import Path class HistoryMode(Enum): """Document query history mode""" ALL = auto() LATEST = auto() class OrderByField(Enum): """Document ordering field""" PATH = auto() LOCAL_INDEX = auto() class OrderDirection(Enum): """Document ordering direction""" ASC = auto() DESC = auto() class Cmp(Enum): """Comparison options""" EQ = auto() LT = auto() GT = auto() class QueryVerdict(Enum): """Basic verdict of a query""" ALL = auto() ALL_LATEST = auto() SOME = auto() NOTHING = auto() class QueryFilter(TypedDict, total=False): """Filtering parameters for a query""" path: Path path_prefix: str path_suffix: str author: Identity timestamp: datetime timestamp_comp: Cmp content_length: int content_length_comp: Cmp class Query: # pylint: disable=too-many-instance-attributes """Document querying""" def __init__( # pylint: disable=too-many-arguments self, history_mode: HistoryMode = HistoryMode.LATEST, order_by_field: OrderByField = OrderByField.PATH, order_direction: OrderDirection = OrderDirection.ASC, start_after_local_index: Optional[int] = None, start_after_path: Optional[Path] = None, flt: Optional[QueryFilter] = None, limit: Optional[int] = None, limit_bytes: Optional[int] = None, ) -> None: if limit is not None and limit < 0: raise ValidationError('query limit must be 0 or greater') if start_after_local_index is not None and start_after_path is not None: raise ValidationError( 'query start position should be either a local_index or a path, but not both' ) if order_by_field == OrderByField.PATH and start_after_local_index is not None: raise ValidationError( 'query would order by path, but instructed to start at a local index' ) if order_by_field == OrderByField.LOCAL_INDEX and start_after_path is not None: raise ValidationError( 'query would order by local index, but instructed to start at a path' ) # TODO check if contentLength and timestamp are reasonable numbers if flt: if ( flt.get('path') and flt.get('path_prefix') is not None and not flt['path'].startswith(flt['path_prefix']) ): raise ValidationError( 'we were asked to match an exact path and a path prefix but they don’t match ' 'each other' ) if ( flt.get('path') and flt.get('path_suffix') is not None and not flt['path'].endswith(flt['path_suffix']) ): raise ValidationError( 'we were asked to match an exact path and a path suffix but they don’t ' 'match each other' ) self.history_mode = history_mode self.order_by_field = order_by_field self.order_direction = order_direction self.limit = limit self.limit_bytes = limit_bytes self.flt = flt self.start_after_local_index = start_after_local_index self.start_after_path = start_after_path self.prediction = ( QueryVerdict.ALL if self.history_mode == HistoryMode.ALL else QueryVerdict.ALL_LATEST ) if self.limit == 0: self.prediction = QueryVerdict.NOTHING elif ( flt or start_after_local_index is not None or start_after_path is not None or limit or limit_bytes ): self.prediction = QueryVerdict.SOME def match_document(self, document: Document) -> bool: """Check if document matches this query""" # pylint: disable=too-many-branches,too-many-return-statements if not self.flt: return True if self.flt.get('path') is not None and document.path != self.flt['path']: return False if self.flt.get('path_prefix') is not None and not document.path.startswith( self.flt['path_prefix'] ): return False if self.flt.get('path_suffix') is not None and not document.path.endswith( self.flt['path_suffix'] ): return False if self.flt.get('author') and document.author != self.flt['author']: return False if self.flt.get('timestamp'): comparator = self.flt.get('timestamp_comp', Cmp.EQ) if comparator == Cmp.EQ and document.timestamp != self.flt['timestamp']: return False if comparator == Cmp.GT and document.timestamp <= self.flt['timestamp']: return False if comparator == Cmp.LT and document.timestamp >= self.flt['timestamp']: return False if self.flt.get('content_length'): comparator = self.flt.get('content_length_comp', Cmp.EQ) if comparator == Cmp.EQ and document.content_length != self.flt['content_length']: return False if comparator == Cmp.GT and document.content_length <= self.flt['content_length']: return False if comparator == Cmp.LT and document.content_length >= self.flt['content_length']: return False return True def __call__(self, documents: Dict[Document, Dict[str, Any]]) -> List[Document]: """Filter a list of documents""" # pylint: disable=too-many-branches if self.prediction == QueryVerdict.NOTHING: return [] output = [document for document in documents if self.match_document(document)] if self.order_by_field == OrderByField.PATH: output.sort( key=lambda document: str(document.path), reverse=self.order_direction == OrderDirection.DESC, ) elif self.order_by_field == OrderByField.LOCAL_INDEX: output.sort( key=lambda document: documents[document]['local_index'], # type: ignore reverse=self.order_direction == OrderDirection.DESC, ) else: # pragma: no cover raise TypeError() start_at = 0 end_at = 0 if self.start_after_path is not None or self.start_after_local_index is not None: for idx, document in enumerate(output): if ( # pylint: disable=too-many-boolean-expressions self.start_after_path is not None and ( ( self.order_direction == OrderDirection.ASC and document.path > self.start_after_path ) or ( self.order_direction == OrderDirection.DESC and document.path < self.start_after_path ) ) ) or ( self.start_after_local_index is not None and ( ( self.order_direction == OrderDirection.ASC and documents[document]['local_index'] > self.start_after_local_index ) or ( self.order_direction == OrderDirection.DESC and documents[document]['local_index'] < self.start_after_local_index ) ) ): start_at = idx break # Apply count limit (ie. stop after self.limit items) if self.limit is not None: end_at = start_at + self.limit else: end_at = len(output) current_size = 0 new_end = None # Apply byte limit if self.limit_bytes is not None: for idx, document in enumerate(output[start_at:end_at]): if current_size + document.content_length > self.limit_bytes: new_end = idx break current_size += document.content_length if new_end is not None: end_at = start_at + new_end return output[start_at:end_at]