From 9631a57307cf01cd5686f4b541723e274c9ad61e Mon Sep 17 00:00:00 2001 From: Gergely Polonkai Date: Fri, 6 May 2022 17:21:49 +0200 Subject: [PATCH] Create the Query class --- earthsnake/query.py | 269 +++++++++++++++++++++++++++ tests/test_query.py | 435 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 704 insertions(+) create mode 100644 earthsnake/query.py create mode 100644 tests/test_query.py diff --git a/earthsnake/query.py b/earthsnake/query.py new file mode 100644 index 0000000..641fa4f --- /dev/null +++ b/earthsnake/query.py @@ -0,0 +1,269 @@ +"""Document querying for replicas""" + +from datetime import datetime +from enum import Enum, auto +from typing import Any, Dict, List, Optional, TypedDict + +from .document import Document +from .exc import ValidationError +from .identity import Identity +from .path import Path + + +class HistoryMode(Enum): + """Document query history mode""" + + ALL = auto() + LATEST = auto() + + +class OrderByField(Enum): + """Document ordering field""" + + PATH = auto() + LOCAL_INDEX = auto() + + +class OrderDirection(Enum): + """Document ordering direction""" + + ASC = auto() + DESC = auto() + + +class Cmp(Enum): + """Comparison options""" + + EQ = auto() + LT = auto() + GT = auto() + + +class QueryVerdict(Enum): + """Basic verdict of a query""" + + ALL = auto() + ALL_LATEST = auto() + SOME = auto() + NOTHING = auto() + + +class QueryFilter(TypedDict, total=False): + """Filtering parameters for a query""" + + path: Path + path_prefix: str + path_suffix: str + author: Identity + timestamp: datetime + timestamp_comp: Cmp + content_length: int + content_length_comp: Cmp + + +class Query: # pylint: disable=too-many-instance-attributes + """Document querying""" + + def __init__( # pylint: disable=too-many-arguments + self, + history_mode: HistoryMode = HistoryMode.LATEST, + order_by_field: OrderByField = OrderByField.PATH, + order_direction: OrderDirection = OrderDirection.ASC, + start_after_local_index: Optional[int] = None, + start_after_path: Optional[Path] = None, + flt: Optional[QueryFilter] = None, + limit: Optional[int] = None, + limit_bytes: Optional[int] = None, + ) -> None: + if limit is not None and limit < 0: + raise ValidationError('query limit must be 0 or greater') + + if start_after_local_index is not None and start_after_path is not None: + raise ValidationError( + 'query start position should be either a local_index or a path, but not both' + ) + + if order_by_field == OrderByField.PATH and start_after_local_index is not None: + raise ValidationError( + 'query would order by path, but instructed to start at a local index' + ) + + if order_by_field == OrderByField.LOCAL_INDEX and start_after_path is not None: + raise ValidationError( + 'query would order by local index, but instructed to start at a path' + ) + + # TODO check if contentLength and timestamp are reasonable numbers + + if flt: + if ( + flt.get('path') + and flt.get('path_prefix') is not None + and not flt['path'].startswith(flt['path_prefix']) + ): + raise ValidationError( + 'we were asked to match an exact path and a path prefix but they don’t match ' + 'each other' + ) + + if ( + flt.get('path') + and flt.get('path_suffix') is not None + and not flt['path'].endswith(flt['path_suffix']) + ): + raise ValidationError( + 'we were asked to match an exact path and a path suffix but they don’t ' + 'match each other' + ) + + self.history_mode = history_mode + self.order_by_field = order_by_field + self.order_direction = order_direction + self.limit = limit + self.limit_bytes = limit_bytes + self.flt = flt + self.start_after_local_index = start_after_local_index + self.start_after_path = start_after_path + self.prediction = ( + QueryVerdict.ALL if self.history_mode == HistoryMode.ALL else QueryVerdict.ALL_LATEST + ) + + if self.limit == 0: + self.prediction = QueryVerdict.NOTHING + elif ( + flt + or start_after_local_index is not None + or start_after_path is not None + or limit + or limit_bytes + ): + self.prediction = QueryVerdict.SOME + + def match_document(self, document: Document) -> bool: + """Check if document matches this query""" + # pylint: disable=too-many-branches,too-many-return-statements + + if not self.flt: + return True + + if self.flt.get('path') is not None and document.path != self.flt['path']: + return False + + if self.flt.get('path_prefix') is not None and not document.path.startswith( + self.flt['path_prefix'] + ): + return False + + if self.flt.get('path_suffix') is not None and not document.path.endswith( + self.flt['path_suffix'] + ): + return False + + if self.flt.get('author') and document.author != self.flt['author']: + return False + + if self.flt.get('timestamp'): + comparator = self.flt.get('timestamp_comp', Cmp.EQ) + + if comparator == Cmp.EQ and document.timestamp != self.flt['timestamp']: + return False + + if comparator == Cmp.GT and document.timestamp <= self.flt['timestamp']: + return False + + if comparator == Cmp.LT and document.timestamp >= self.flt['timestamp']: + return False + + if self.flt.get('content_length'): + comparator = self.flt.get('content_length_comp', Cmp.EQ) + + if comparator == Cmp.EQ and document.content_length != self.flt['content_length']: + return False + + if comparator == Cmp.GT and document.content_length <= self.flt['content_length']: + return False + + if comparator == Cmp.LT and document.content_length >= self.flt['content_length']: + return False + + return True + + def __call__(self, documents: Dict[Document, Dict[str, Any]]) -> List[Document]: + """Filter a list of documents""" + + # pylint: disable=too-many-branches + + if self.prediction == QueryVerdict.NOTHING: + return [] + + output = [document for document in documents if self.match_document(document)] + + if self.order_by_field == OrderByField.PATH: + output.sort( + key=lambda document: str(document.path), + reverse=self.order_direction == OrderDirection.DESC, + ) + elif self.order_by_field == OrderByField.LOCAL_INDEX: + output.sort( + key=lambda document: documents[document]['local_index'], # type: ignore + reverse=self.order_direction == OrderDirection.DESC, + ) + else: # pragma: no cover + raise TypeError() + + start_at = 0 + end_at = 0 + + if self.start_after_path is not None or self.start_after_local_index is not None: + for idx, document in enumerate(output): + if ( # pylint: disable=too-many-boolean-expressions + self.start_after_path is not None + and ( + ( + self.order_direction == OrderDirection.ASC + and document.path > self.start_after_path + ) + or ( + self.order_direction == OrderDirection.DESC + and document.path < self.start_after_path + ) + ) + ) or ( + self.start_after_local_index is not None + and ( + ( + self.order_direction == OrderDirection.ASC + and documents[document]['local_index'] > self.start_after_local_index + ) + or ( + self.order_direction == OrderDirection.DESC + and documents[document]['local_index'] < self.start_after_local_index + ) + ) + ): + start_at = idx + break + + # Apply count limit (ie. stop after self.limit items) + if self.limit is not None: + end_at = start_at + self.limit + else: + end_at = len(output) + + current_size = 0 + new_end = None + + # Apply byte limit + if self.limit_bytes is not None: + for idx, document in enumerate(output[start_at:end_at]): + if current_size + document.content_length > self.limit_bytes: + new_end = idx + + break + + current_size += document.content_length + + if new_end is not None: + end_at = start_at + new_end + + return output[start_at:end_at] diff --git a/tests/test_query.py b/tests/test_query.py new file mode 100644 index 0000000..b96f995 --- /dev/null +++ b/tests/test_query.py @@ -0,0 +1,435 @@ +"""Tests for document querying""" + +from datetime import datetime, timedelta, timezone + +from typing import Literal + +import pytest + +from earthsnake.document.es4 import Es4Document +from earthsnake.exc import ValidationError +from earthsnake.identity import Identity +from earthsnake.path import Path +from earthsnake.query import ( + Cmp, + HistoryMode, + OrderByField, + OrderDirection, + Query, + QueryFilter, + QueryVerdict, +) + + +def test_init_negative_limit() -> None: + """Test if initialization fails with a negative limit""" + + with pytest.raises(ValidationError) as ctx: + Query(limit=-1) + + assert str(ctx.value) == 'query limit must be 0 or greater' + + +def test_init_mutual_exclusive_start_after() -> None: + """Test if start_after_local_index and start_after_path are mutually exclusive""" + + with pytest.raises(ValidationError) as ctx: + Query(start_after_local_index=1, start_after_path=Path('/test.txt')) + + assert ( + str(ctx.value) + == 'query start position should be either a local_index or a path, but not both' + ) + + +def test_init_order_start_after_mismatch_path() -> None: + """Test if init fails if ordering by path but starting at a local index""" + + with pytest.raises(ValidationError) as ctx: + Query(start_after_local_index=1, order_by_field=OrderByField.PATH) + + assert str(ctx.value) == 'query would order by path, but instructed to start at a local index' + + +def test_init_order_start_after_mismatch_local_index() -> None: + """Test if init fails if ordering by local_index but starting at a path""" + + with pytest.raises(ValidationError) as ctx: + Query(start_after_path=Path('/test.txt'), order_by_field=OrderByField.LOCAL_INDEX) + + assert str(ctx.value) == 'query would order by local index, but instructed to start at a path' + + +def test_init_path_startswith_mismatch() -> None: + """Test if init fails if we match an exact path and a path prefix that doesn’t match path""" + + with pytest.raises(ValidationError) as ctx: + Query(flt=QueryFilter(path=Path('/test.txt'), path_prefix='/other.md')) + + assert ( + str(ctx.value) + == 'we were asked to match an exact path and a path prefix but they don’t match each other' + ) + + +def test_init_path_endswith_mismatch() -> None: + """Test if init fails if we match an exact path and a path suffix that doesn’t match path""" + + with pytest.raises(ValidationError) as ctx: + Query(flt=QueryFilter(path=Path('/test.txt'), path_suffix='.md')) + + assert ( + str(ctx.value) + == 'we were asked to match an exact path and a path suffix but they don’t match each other' + ) + + +def test_init_empty_prediction() -> None: + """Test if verdict is NOTHING if limit is zero""" + + query = Query(limit=0) + + assert query.prediction == QueryVerdict.NOTHING + + +def test_init_filter_prediction() -> None: + """Test if verdict is SOME if we have a filter set""" + + query = Query(flt=QueryFilter(path=Path('/test.txt'))) + + assert query.prediction == QueryVerdict.SOME + + +@pytest.mark.parametrize( + 'history_mode,prediction', + ( + pytest.param(HistoryMode.ALL, QueryVerdict.ALL), + pytest.param(HistoryMode.LATEST, QueryVerdict.ALL_LATEST), + ), +) +def test_init_all_prediction(history_mode: HistoryMode, prediction: QueryVerdict) -> None: + """Test if verdict is ALL/ALL_LATEST if we don’t have any filtering parameters set""" + + query = Query(history_mode=history_mode) + + assert query.prediction == prediction + + +def test_match_doc_no_filter(es4_document: Es4Document) -> None: + """Test if the empty filter works corretly""" + + query = Query() + + assert query.match_document(es4_document) is True + + +def test_match_doc_path(es4_document: Es4Document) -> None: + """Test if the path works correctly""" + query = Query(flt=QueryFilter(path=Path('/test.txt'))) + + es4_document.path = Path('/test.txt') + assert query.match_document(es4_document) + + es4_document.path = Path('/test.md') + assert not query.match_document(es4_document) + + +def test_match_doc_path_prefix(es4_document: Es4Document) -> None: + """Test if the path prefix filter works correctly""" + query = Query(flt=QueryFilter(path_prefix='/test')) + + es4_document.path = Path('/test.txt') + assert query.match_document(es4_document) + + es4_document.path = Path('/other.txt') + assert not query.match_document(es4_document) + + +def test_match_doc_path_suffix(es4_document: Es4Document) -> None: + """Test if the path suffix filter works correctly""" + + query = Query(flt=QueryFilter(path_suffix='.txt')) + + es4_document.path = Path('/test.txt') + assert query.match_document(es4_document) + + es4_document.path = Path('/test.md') + assert not query.match_document(es4_document) + + +def test_match_doc_author(es4_document: Es4Document, identity: Identity) -> None: + """Test if the author filter works correctly""" + + query = Query(flt=QueryFilter(author=identity)) + + es4_document.author = identity + assert query.match_document(es4_document) + + es4_document.author = Identity.generate('name') + assert not query.match_document(es4_document) + + +@pytest.mark.parametrize('cmp_usage', ('explicit', 'implicit')) +def test_match_timestamp_eq( + cmp_usage: Literal['explicit', 'implicit'], + es4_document: Es4Document, +) -> None: + """Test if the timestamp filter works correctly with the EQ comparator""" + + flt = QueryFilter(timestamp=datetime(2022, 5, 5, 8, 21, 11, 668993, tzinfo=timezone.utc)) + + if cmp_usage == 'explicit': + flt['timestamp_comp'] = Cmp.EQ + + query = Query(flt=flt) + + assert query.match_document(es4_document) + + es4_document.timestamp += timedelta(days=1) + assert not query.match_document(es4_document) + + +def test_match_timestamp_gt(es4_document: Es4Document) -> None: + """Test if the timestamp filter works correctly with the GT comparator""" + + flt = QueryFilter( + timestamp=datetime(2022, 5, 5, 8, 21, 11, 668992, tzinfo=timezone.utc), + timestamp_comp=Cmp.GT, + ) + + query = Query(flt=flt) + + assert query.match_document(es4_document) + + es4_document.timestamp -= timedelta(microseconds=1) + assert not query.match_document(es4_document) + + +def test_match_timestamp_lt(es4_document: Es4Document) -> None: + """Test if the timestamp filter works correctly with the LT comparator""" + + flt = QueryFilter( + timestamp=datetime(2022, 5, 5, 8, 21, 11, 668994, tzinfo=timezone.utc), + timestamp_comp=Cmp.LT, + ) + + query = Query(flt=flt) + + assert query.match_document(es4_document) + + es4_document.timestamp += timedelta(microseconds=1) + assert not query.match_document(es4_document) + + +@pytest.mark.parametrize('cmp_usage', ('explicit', 'implicit')) +def test_match_content_length_eq( + cmp_usage: Literal['explicit', 'implicit'], + es4_document: Es4Document, +) -> None: + """Test if the content_length filter works correctly with the EQ comparator""" + + flt = QueryFilter(content_length=4) + + if cmp_usage == 'explicit': + flt['content_length_comp'] = Cmp.EQ + + query = Query(flt=flt) + + assert query.match_document(es4_document) + + es4_document.content = 'other' + assert not query.match_document(es4_document) + + +def test_match_content_length_gt(es4_document: Es4Document) -> None: + """Test if the content_length filter works correctly with the GT comparator""" + + flt = QueryFilter(content_length=4, content_length_comp=Cmp.GT) + + query = Query(flt=flt) + + assert not query.match_document(es4_document) + + es4_document.content = 'other' + assert query.match_document(es4_document) + + +def test_match_content_length_lt(es4_document: Es4Document) -> None: + """Test if the content_length filter works correctly with the LT comparator""" + + flt = QueryFilter(content_length=4, content_length_comp=Cmp.LT) + + query = Query(flt=flt) + + assert not query.match_document(es4_document) + + es4_document.content = 'see' + assert query.match_document(es4_document) + + +def test_query_filter_nothing_prediction(es4_document: Es4Document) -> None: + """Test if the query predicted NOTHING, it returns an empty list""" + + query = Query(limit=0) + + assert not query({es4_document: {}}) + + +@pytest.mark.parametrize('cmp_usage', ('implicit', 'explicit')) +def test_query_filter_path_sort( + cmp_usage: Literal['implicit', 'explicit'], + es4_document: Es4Document, + identity: Identity, +) -> None: + """Test if ordering by path works""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + + if cmp_usage == 'implicit': + query = Query(order_by_field=OrderByField.PATH) + else: + query = Query(order_by_field=OrderByField.PATH, order_direction=OrderDirection.ASC) + + assert query({es4_document: {}, other_document: {}}) == [other_document, es4_document] + + +def test_query_filter_path_sort_desc(es4_document: Es4Document, identity: Identity) -> None: + """Test if ordering by path in descending order works""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + query = Query(order_by_field=OrderByField.PATH, order_direction=OrderDirection.DESC) + + assert query({other_document: {}, es4_document: {}}) == [es4_document, other_document] + + +@pytest.mark.parametrize('cmp_usage', ('implicit', 'explicit')) +def test_query_filter_local_index_sort( + cmp_usage: Literal['implicit', 'explicit'], + es4_document: Es4Document, + identity: Identity, +) -> None: + """Test if ordering by path works""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + + if cmp_usage == 'implicit': + query = Query(order_by_field=OrderByField.LOCAL_INDEX) + else: + query = Query(order_by_field=OrderByField.LOCAL_INDEX, order_direction=OrderDirection.ASC) + + assert query({es4_document: {'local_index': 2}, other_document: {'local_index': 1}}) == [ + other_document, + es4_document, + ] + + +def test_query_filter_local_index_sort_desc(es4_document: Es4Document, identity: Identity) -> None: + """Test if ordering by path in descending order works""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + query = Query(order_by_field=OrderByField.LOCAL_INDEX, order_direction=OrderDirection.DESC) + + assert query({other_document: {'local_index': 1}, es4_document: {'local_index': 2}}) == [ + es4_document, + other_document, + ] + + +def test_start_after_path_asc(es4_document: Es4Document, identity: Identity) -> None: + """Test if start_after_path works with ascending sort""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + query = Query(order_by_field=OrderByField.PATH, start_after_path=Path('/much.txt')) + + assert query({other_document: {}, es4_document: {}}) == [es4_document] + + +def test_start_after_path_desc(es4_document: Es4Document, identity: Identity) -> None: + """Test if start_after_path works with descending sort""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + query = Query( + order_by_field=OrderByField.PATH, + order_direction=OrderDirection.DESC, + start_after_path=Path('/test.txt'), + ) + + assert query({other_document: {}, es4_document: {}}) == [other_document] + + +def test_start_after_local_index_asc(es4_document: Es4Document, identity: Identity) -> None: + """Test if start_after_local_index works with ascending sort""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + query = Query(order_by_field=OrderByField.LOCAL_INDEX, start_after_local_index=1) + + assert query({other_document: {'local_index': 2}, es4_document: {'local_index': 1}}) == [ + other_document + ] + + +def test_start_after_local_index_desc(es4_document: Es4Document, identity: Identity) -> None: + """Test if start_after_path works with descending sort""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + query = Query( + order_by_field=OrderByField.LOCAL_INDEX, + order_direction=OrderDirection.DESC, + start_after_local_index=2, + ) + + assert query({other_document: {'local_index': 1}, es4_document: {'local_index': 2}}) == [ + other_document + ] + + +def test_start_after_with_empty_output() -> None: + """Test if start_after does not fail with an empty document list""" + + query = Query(start_after_path=Path('/test.txt')) + + assert query({}) == [] + + +@pytest.mark.parametrize('use_start_after', (True, False), ids=('with_start_after', 'without_start_after')) +def test_limit(use_start_after, es4_document: Es4Document, identity: Identity) -> None: + """Test if document count limiting works""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt')) + + if use_start_after: + query = Query(start_after_path=Path('/much.txt'), limit=1) + expected = es4_document + else: + query = Query(limit=1) + expected = other_document + + assert query({es4_document: {}, other_document: {}}) == [expected] + + +def test_byte_limiting_not_reached(es4_document: Es4Document, identity: Identity) -> None: + """Test if byte limiting doesn’t do anything if the sum of document sizes is not reached""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt'), content='much') + query = Query(limit_bytes=15) + + assert query({es4_document: {}, other_document: {}}) == [other_document, es4_document] + + +def test_byte_limiting(es4_document: Es4Document, identity: Identity) -> None: + """Test if byte limiting works as expected""" + + other_document = Es4Document(identity, es4_document.share, Path('/much.txt'), content='much') + query = Query(limit_bytes=5) + + assert query({es4_document: {}, other_document: {}}) == [other_document] + + +def test_byte_limiting_with_start_after(es4_document: Es4Document, identity: Identity) -> None: + """Test if byte limiting works as expected when not starting from the beginning""" + + other_document1 = Es4Document(identity, es4_document.share, Path('/much1.txt'), content='much1') + other_document2 = Es4Document(identity, es4_document.share, Path('/much2.txt'), content='much2') + query = Query(limit_bytes=5, start_after_path='/much1.txt') + + assert query({es4_document: {}, other_document1: {}, other_document2: {}}) == [other_document2]