|
|
"""Document querying for replicas"""
|
|
|
|
|
|
from datetime import datetime
|
|
|
from enum import Enum, auto
|
|
|
from typing import Any, Dict, List, Optional, TypedDict
|
|
|
|
|
|
from .document import Document
|
|
|
from .exc import ValidationError
|
|
|
from .identity import Identity
|
|
|
from .path import Path
|
|
|
|
|
|
|
|
|
class HistoryMode(Enum):
|
|
|
"""Document query history mode"""
|
|
|
|
|
|
ALL = auto()
|
|
|
LATEST = auto()
|
|
|
|
|
|
|
|
|
class OrderByField(Enum):
|
|
|
"""Document ordering field"""
|
|
|
|
|
|
PATH = auto()
|
|
|
LOCAL_INDEX = auto()
|
|
|
|
|
|
|
|
|
class OrderDirection(Enum):
|
|
|
"""Document ordering direction"""
|
|
|
|
|
|
ASC = auto()
|
|
|
DESC = auto()
|
|
|
|
|
|
|
|
|
class Cmp(Enum):
|
|
|
"""Comparison options"""
|
|
|
|
|
|
EQ = auto()
|
|
|
LT = auto()
|
|
|
GT = auto()
|
|
|
|
|
|
|
|
|
class QueryVerdict(Enum):
|
|
|
"""Basic verdict of a query"""
|
|
|
|
|
|
ALL = auto()
|
|
|
ALL_LATEST = auto()
|
|
|
SOME = auto()
|
|
|
NOTHING = auto()
|
|
|
|
|
|
|
|
|
class QueryFilter(TypedDict, total=False):
|
|
|
"""Filtering parameters for a query"""
|
|
|
|
|
|
path: Path
|
|
|
path_prefix: str
|
|
|
path_suffix: str
|
|
|
author: Identity
|
|
|
timestamp: datetime
|
|
|
timestamp_comp: Cmp
|
|
|
content_length: int
|
|
|
content_length_comp: Cmp
|
|
|
|
|
|
|
|
|
class Query: # pylint: disable=too-many-instance-attributes
|
|
|
"""Document querying"""
|
|
|
|
|
|
def __init__( # pylint: disable=too-many-arguments
|
|
|
self,
|
|
|
history_mode: HistoryMode = HistoryMode.LATEST,
|
|
|
order_by_field: OrderByField = OrderByField.PATH,
|
|
|
order_direction: OrderDirection = OrderDirection.ASC,
|
|
|
start_after_local_index: Optional[int] = None,
|
|
|
start_after_path: Optional[Path] = None,
|
|
|
flt: Optional[QueryFilter] = None,
|
|
|
limit: Optional[int] = None,
|
|
|
limit_bytes: Optional[int] = None,
|
|
|
) -> None:
|
|
|
if limit is not None and limit < 0:
|
|
|
raise ValidationError('query limit must be 0 or greater')
|
|
|
|
|
|
if start_after_local_index is not None and start_after_path is not None:
|
|
|
raise ValidationError(
|
|
|
'query start position should be either a local_index or a path, but not both'
|
|
|
)
|
|
|
|
|
|
if order_by_field == OrderByField.PATH and start_after_local_index is not None:
|
|
|
raise ValidationError(
|
|
|
'query would order by path, but instructed to start at a local index'
|
|
|
)
|
|
|
|
|
|
if order_by_field == OrderByField.LOCAL_INDEX and start_after_path is not None:
|
|
|
raise ValidationError(
|
|
|
'query would order by local index, but instructed to start at a path'
|
|
|
)
|
|
|
|
|
|
# TODO check if contentLength and timestamp are reasonable numbers
|
|
|
|
|
|
if flt:
|
|
|
if (
|
|
|
flt.get('path')
|
|
|
and flt.get('path_prefix') is not None
|
|
|
and not flt['path'].startswith(flt['path_prefix'])
|
|
|
):
|
|
|
raise ValidationError(
|
|
|
'we were asked to match an exact path and a path prefix but they don’t match '
|
|
|
'each other'
|
|
|
)
|
|
|
|
|
|
if (
|
|
|
flt.get('path')
|
|
|
and flt.get('path_suffix') is not None
|
|
|
and not flt['path'].endswith(flt['path_suffix'])
|
|
|
):
|
|
|
raise ValidationError(
|
|
|
'we were asked to match an exact path and a path suffix but they don’t '
|
|
|
'match each other'
|
|
|
)
|
|
|
|
|
|
self.history_mode = history_mode
|
|
|
self.order_by_field = order_by_field
|
|
|
self.order_direction = order_direction
|
|
|
self.limit = limit
|
|
|
self.limit_bytes = limit_bytes
|
|
|
self.flt = flt
|
|
|
self.start_after_local_index = start_after_local_index
|
|
|
self.start_after_path = start_after_path
|
|
|
self.prediction = (
|
|
|
QueryVerdict.ALL if self.history_mode == HistoryMode.ALL else QueryVerdict.ALL_LATEST
|
|
|
)
|
|
|
|
|
|
if self.limit == 0:
|
|
|
self.prediction = QueryVerdict.NOTHING
|
|
|
elif (
|
|
|
flt
|
|
|
or start_after_local_index is not None
|
|
|
or start_after_path is not None
|
|
|
or limit
|
|
|
or limit_bytes
|
|
|
):
|
|
|
self.prediction = QueryVerdict.SOME
|
|
|
|
|
|
def match_document(self, document: Document) -> bool:
|
|
|
"""Check if document matches this query"""
|
|
|
# pylint: disable=too-many-branches,too-many-return-statements
|
|
|
|
|
|
if not self.flt:
|
|
|
return True
|
|
|
|
|
|
if self.flt.get('path') is not None and document.path != self.flt['path']:
|
|
|
return False
|
|
|
|
|
|
if self.flt.get('path_prefix') is not None and not document.path.startswith(
|
|
|
self.flt['path_prefix']
|
|
|
):
|
|
|
return False
|
|
|
|
|
|
if self.flt.get('path_suffix') is not None and not document.path.endswith(
|
|
|
self.flt['path_suffix']
|
|
|
):
|
|
|
return False
|
|
|
|
|
|
if self.flt.get('author') and document.author != self.flt['author']:
|
|
|
return False
|
|
|
|
|
|
if self.flt.get('timestamp'):
|
|
|
comparator = self.flt.get('timestamp_comp', Cmp.EQ)
|
|
|
|
|
|
if comparator == Cmp.EQ and document.timestamp != self.flt['timestamp']:
|
|
|
return False
|
|
|
|
|
|
if comparator == Cmp.GT and document.timestamp <= self.flt['timestamp']:
|
|
|
return False
|
|
|
|
|
|
if comparator == Cmp.LT and document.timestamp >= self.flt['timestamp']:
|
|
|
return False
|
|
|
|
|
|
if self.flt.get('content_length'):
|
|
|
comparator = self.flt.get('content_length_comp', Cmp.EQ)
|
|
|
|
|
|
if comparator == Cmp.EQ and document.content_length != self.flt['content_length']:
|
|
|
return False
|
|
|
|
|
|
if comparator == Cmp.GT and document.content_length <= self.flt['content_length']:
|
|
|
return False
|
|
|
|
|
|
if comparator == Cmp.LT and document.content_length >= self.flt['content_length']:
|
|
|
return False
|
|
|
|
|
|
return True
|
|
|
|
|
|
def __call__(self, documents: Dict[Document, Dict[str, Any]]) -> List[Document]:
|
|
|
"""Filter a list of documents"""
|
|
|
|
|
|
# pylint: disable=too-many-branches
|
|
|
|
|
|
if self.prediction == QueryVerdict.NOTHING:
|
|
|
return []
|
|
|
|
|
|
output = [document for document in documents if self.match_document(document)]
|
|
|
|
|
|
if self.order_by_field == OrderByField.PATH:
|
|
|
output.sort(
|
|
|
key=lambda document: str(document.path),
|
|
|
reverse=self.order_direction == OrderDirection.DESC,
|
|
|
)
|
|
|
elif self.order_by_field == OrderByField.LOCAL_INDEX:
|
|
|
output.sort(
|
|
|
key=lambda document: documents[document]['local_index'], # type: ignore
|
|
|
reverse=self.order_direction == OrderDirection.DESC,
|
|
|
)
|
|
|
else: # pragma: no cover
|
|
|
raise TypeError()
|
|
|
|
|
|
start_at = 0
|
|
|
end_at = 0
|
|
|
|
|
|
if self.start_after_path is not None or self.start_after_local_index is not None:
|
|
|
for idx, document in enumerate(output):
|
|
|
if ( # pylint: disable=too-many-boolean-expressions
|
|
|
self.start_after_path is not None
|
|
|
and (
|
|
|
(
|
|
|
self.order_direction == OrderDirection.ASC
|
|
|
and document.path > self.start_after_path
|
|
|
)
|
|
|
or (
|
|
|
self.order_direction == OrderDirection.DESC
|
|
|
and document.path < self.start_after_path
|
|
|
)
|
|
|
)
|
|
|
) or (
|
|
|
self.start_after_local_index is not None
|
|
|
and (
|
|
|
(
|
|
|
self.order_direction == OrderDirection.ASC
|
|
|
and documents[document]['local_index'] > self.start_after_local_index
|
|
|
)
|
|
|
or (
|
|
|
self.order_direction == OrderDirection.DESC
|
|
|
and documents[document]['local_index'] < self.start_after_local_index
|
|
|
)
|
|
|
)
|
|
|
):
|
|
|
start_at = idx
|
|
|
break
|
|
|
|
|
|
# Apply count limit (ie. stop after self.limit items)
|
|
|
if self.limit is not None:
|
|
|
end_at = start_at + self.limit
|
|
|
else:
|
|
|
end_at = len(output)
|
|
|
|
|
|
current_size = 0
|
|
|
new_end = None
|
|
|
|
|
|
# Apply byte limit
|
|
|
if self.limit_bytes is not None:
|
|
|
for idx, document in enumerate(output[start_at:end_at]):
|
|
|
if current_size + document.content_length > self.limit_bytes:
|
|
|
new_end = idx
|
|
|
|
|
|
break
|
|
|
|
|
|
current_size += document.content_length
|
|
|
|
|
|
if new_end is not None:
|
|
|
end_at = start_at + new_end
|
|
|
|
|
|
return output[start_at:end_at]
|