Create the Query class

This commit is contained in:
Gergely Polonkai 2022-05-06 17:21:49 +02:00
parent b0a7719bf1
commit 9631a57307
No known key found for this signature in database
GPG Key ID: 2D2885533B869ED4
2 changed files with 704 additions and 0 deletions

269
earthsnake/query.py Normal file
View File

@ -0,0 +1,269 @@
"""Document querying for replicas"""
from datetime import datetime
from enum import Enum, auto
from typing import Any, Dict, List, Optional, TypedDict
from .document import Document
from .exc import ValidationError
from .identity import Identity
from .path import Path
class HistoryMode(Enum):
"""Document query history mode"""
ALL = auto()
LATEST = auto()
class OrderByField(Enum):
"""Document ordering field"""
PATH = auto()
LOCAL_INDEX = auto()
class OrderDirection(Enum):
"""Document ordering direction"""
ASC = auto()
DESC = auto()
class Cmp(Enum):
"""Comparison options"""
EQ = auto()
LT = auto()
GT = auto()
class QueryVerdict(Enum):
"""Basic verdict of a query"""
ALL = auto()
ALL_LATEST = auto()
SOME = auto()
NOTHING = auto()
class QueryFilter(TypedDict, total=False):
"""Filtering parameters for a query"""
path: Path
path_prefix: str
path_suffix: str
author: Identity
timestamp: datetime
timestamp_comp: Cmp
content_length: int
content_length_comp: Cmp
class Query: # pylint: disable=too-many-instance-attributes
"""Document querying"""
def __init__( # pylint: disable=too-many-arguments
self,
history_mode: HistoryMode = HistoryMode.LATEST,
order_by_field: OrderByField = OrderByField.PATH,
order_direction: OrderDirection = OrderDirection.ASC,
start_after_local_index: Optional[int] = None,
start_after_path: Optional[Path] = None,
flt: Optional[QueryFilter] = None,
limit: Optional[int] = None,
limit_bytes: Optional[int] = None,
) -> None:
if limit is not None and limit < 0:
raise ValidationError('query limit must be 0 or greater')
if start_after_local_index is not None and start_after_path is not None:
raise ValidationError(
'query start position should be either a local_index or a path, but not both'
)
if order_by_field == OrderByField.PATH and start_after_local_index is not None:
raise ValidationError(
'query would order by path, but instructed to start at a local index'
)
if order_by_field == OrderByField.LOCAL_INDEX and start_after_path is not None:
raise ValidationError(
'query would order by local index, but instructed to start at a path'
)
# TODO check if contentLength and timestamp are reasonable numbers
if flt:
if (
flt.get('path')
and flt.get('path_prefix') is not None
and not flt['path'].startswith(flt['path_prefix'])
):
raise ValidationError(
'we were asked to match an exact path and a path prefix but they dont match '
'each other'
)
if (
flt.get('path')
and flt.get('path_suffix') is not None
and not flt['path'].endswith(flt['path_suffix'])
):
raise ValidationError(
'we were asked to match an exact path and a path suffix but they dont '
'match each other'
)
self.history_mode = history_mode
self.order_by_field = order_by_field
self.order_direction = order_direction
self.limit = limit
self.limit_bytes = limit_bytes
self.flt = flt
self.start_after_local_index = start_after_local_index
self.start_after_path = start_after_path
self.prediction = (
QueryVerdict.ALL if self.history_mode == HistoryMode.ALL else QueryVerdict.ALL_LATEST
)
if self.limit == 0:
self.prediction = QueryVerdict.NOTHING
elif (
flt
or start_after_local_index is not None
or start_after_path is not None
or limit
or limit_bytes
):
self.prediction = QueryVerdict.SOME
def match_document(self, document: Document) -> bool:
"""Check if document matches this query"""
# pylint: disable=too-many-branches,too-many-return-statements
if not self.flt:
return True
if self.flt.get('path') is not None and document.path != self.flt['path']:
return False
if self.flt.get('path_prefix') is not None and not document.path.startswith(
self.flt['path_prefix']
):
return False
if self.flt.get('path_suffix') is not None and not document.path.endswith(
self.flt['path_suffix']
):
return False
if self.flt.get('author') and document.author != self.flt['author']:
return False
if self.flt.get('timestamp'):
comparator = self.flt.get('timestamp_comp', Cmp.EQ)
if comparator == Cmp.EQ and document.timestamp != self.flt['timestamp']:
return False
if comparator == Cmp.GT and document.timestamp <= self.flt['timestamp']:
return False
if comparator == Cmp.LT and document.timestamp >= self.flt['timestamp']:
return False
if self.flt.get('content_length'):
comparator = self.flt.get('content_length_comp', Cmp.EQ)
if comparator == Cmp.EQ and document.content_length != self.flt['content_length']:
return False
if comparator == Cmp.GT and document.content_length <= self.flt['content_length']:
return False
if comparator == Cmp.LT and document.content_length >= self.flt['content_length']:
return False
return True
def __call__(self, documents: Dict[Document, Dict[str, Any]]) -> List[Document]:
"""Filter a list of documents"""
# pylint: disable=too-many-branches
if self.prediction == QueryVerdict.NOTHING:
return []
output = [document for document in documents if self.match_document(document)]
if self.order_by_field == OrderByField.PATH:
output.sort(
key=lambda document: str(document.path),
reverse=self.order_direction == OrderDirection.DESC,
)
elif self.order_by_field == OrderByField.LOCAL_INDEX:
output.sort(
key=lambda document: documents[document]['local_index'], # type: ignore
reverse=self.order_direction == OrderDirection.DESC,
)
else: # pragma: no cover
raise TypeError()
start_at = 0
end_at = 0
if self.start_after_path is not None or self.start_after_local_index is not None:
for idx, document in enumerate(output):
if ( # pylint: disable=too-many-boolean-expressions
self.start_after_path is not None
and (
(
self.order_direction == OrderDirection.ASC
and document.path > self.start_after_path
)
or (
self.order_direction == OrderDirection.DESC
and document.path < self.start_after_path
)
)
) or (
self.start_after_local_index is not None
and (
(
self.order_direction == OrderDirection.ASC
and documents[document]['local_index'] > self.start_after_local_index
)
or (
self.order_direction == OrderDirection.DESC
and documents[document]['local_index'] < self.start_after_local_index
)
)
):
start_at = idx
break
# Apply count limit (ie. stop after self.limit items)
if self.limit is not None:
end_at = start_at + self.limit
else:
end_at = len(output)
current_size = 0
new_end = None
# Apply byte limit
if self.limit_bytes is not None:
for idx, document in enumerate(output[start_at:end_at]):
if current_size + document.content_length > self.limit_bytes:
new_end = idx
break
current_size += document.content_length
if new_end is not None:
end_at = start_at + new_end
return output[start_at:end_at]

435
tests/test_query.py Normal file
View File

@ -0,0 +1,435 @@
"""Tests for document querying"""
from datetime import datetime, timedelta, timezone
from typing import Literal
import pytest
from earthsnake.document.es4 import Es4Document
from earthsnake.exc import ValidationError
from earthsnake.identity import Identity
from earthsnake.path import Path
from earthsnake.query import (
Cmp,
HistoryMode,
OrderByField,
OrderDirection,
Query,
QueryFilter,
QueryVerdict,
)
def test_init_negative_limit() -> None:
"""Test if initialization fails with a negative limit"""
with pytest.raises(ValidationError) as ctx:
Query(limit=-1)
assert str(ctx.value) == 'query limit must be 0 or greater'
def test_init_mutual_exclusive_start_after() -> None:
"""Test if start_after_local_index and start_after_path are mutually exclusive"""
with pytest.raises(ValidationError) as ctx:
Query(start_after_local_index=1, start_after_path=Path('/test.txt'))
assert (
str(ctx.value)
== 'query start position should be either a local_index or a path, but not both'
)
def test_init_order_start_after_mismatch_path() -> None:
"""Test if init fails if ordering by path but starting at a local index"""
with pytest.raises(ValidationError) as ctx:
Query(start_after_local_index=1, order_by_field=OrderByField.PATH)
assert str(ctx.value) == 'query would order by path, but instructed to start at a local index'
def test_init_order_start_after_mismatch_local_index() -> None:
"""Test if init fails if ordering by local_index but starting at a path"""
with pytest.raises(ValidationError) as ctx:
Query(start_after_path=Path('/test.txt'), order_by_field=OrderByField.LOCAL_INDEX)
assert str(ctx.value) == 'query would order by local index, but instructed to start at a path'
def test_init_path_startswith_mismatch() -> None:
"""Test if init fails if we match an exact path and a path prefix that doesnt match path"""
with pytest.raises(ValidationError) as ctx:
Query(flt=QueryFilter(path=Path('/test.txt'), path_prefix='/other.md'))
assert (
str(ctx.value)
== 'we were asked to match an exact path and a path prefix but they dont match each other'
)
def test_init_path_endswith_mismatch() -> None:
"""Test if init fails if we match an exact path and a path suffix that doesnt match path"""
with pytest.raises(ValidationError) as ctx:
Query(flt=QueryFilter(path=Path('/test.txt'), path_suffix='.md'))
assert (
str(ctx.value)
== 'we were asked to match an exact path and a path suffix but they dont match each other'
)
def test_init_empty_prediction() -> None:
"""Test if verdict is NOTHING if limit is zero"""
query = Query(limit=0)
assert query.prediction == QueryVerdict.NOTHING
def test_init_filter_prediction() -> None:
"""Test if verdict is SOME if we have a filter set"""
query = Query(flt=QueryFilter(path=Path('/test.txt')))
assert query.prediction == QueryVerdict.SOME
@pytest.mark.parametrize(
'history_mode,prediction',
(
pytest.param(HistoryMode.ALL, QueryVerdict.ALL),
pytest.param(HistoryMode.LATEST, QueryVerdict.ALL_LATEST),
),
)
def test_init_all_prediction(history_mode: HistoryMode, prediction: QueryVerdict) -> None:
"""Test if verdict is ALL/ALL_LATEST if we dont have any filtering parameters set"""
query = Query(history_mode=history_mode)
assert query.prediction == prediction
def test_match_doc_no_filter(es4_document: Es4Document) -> None:
"""Test if the empty filter works corretly"""
query = Query()
assert query.match_document(es4_document) is True
def test_match_doc_path(es4_document: Es4Document) -> None:
"""Test if the path works correctly"""
query = Query(flt=QueryFilter(path=Path('/test.txt')))
es4_document.path = Path('/test.txt')
assert query.match_document(es4_document)
es4_document.path = Path('/test.md')
assert not query.match_document(es4_document)
def test_match_doc_path_prefix(es4_document: Es4Document) -> None:
"""Test if the path prefix filter works correctly"""
query = Query(flt=QueryFilter(path_prefix='/test'))
es4_document.path = Path('/test.txt')
assert query.match_document(es4_document)
es4_document.path = Path('/other.txt')
assert not query.match_document(es4_document)
def test_match_doc_path_suffix(es4_document: Es4Document) -> None:
"""Test if the path suffix filter works correctly"""
query = Query(flt=QueryFilter(path_suffix='.txt'))
es4_document.path = Path('/test.txt')
assert query.match_document(es4_document)
es4_document.path = Path('/test.md')
assert not query.match_document(es4_document)
def test_match_doc_author(es4_document: Es4Document, identity: Identity) -> None:
"""Test if the author filter works correctly"""
query = Query(flt=QueryFilter(author=identity))
es4_document.author = identity
assert query.match_document(es4_document)
es4_document.author = Identity.generate('name')
assert not query.match_document(es4_document)
@pytest.mark.parametrize('cmp_usage', ('explicit', 'implicit'))
def test_match_timestamp_eq(
cmp_usage: Literal['explicit', 'implicit'],
es4_document: Es4Document,
) -> None:
"""Test if the timestamp filter works correctly with the EQ comparator"""
flt = QueryFilter(timestamp=datetime(2022, 5, 5, 8, 21, 11, 668993, tzinfo=timezone.utc))
if cmp_usage == 'explicit':
flt['timestamp_comp'] = Cmp.EQ
query = Query(flt=flt)
assert query.match_document(es4_document)
es4_document.timestamp += timedelta(days=1)
assert not query.match_document(es4_document)
def test_match_timestamp_gt(es4_document: Es4Document) -> None:
"""Test if the timestamp filter works correctly with the GT comparator"""
flt = QueryFilter(
timestamp=datetime(2022, 5, 5, 8, 21, 11, 668992, tzinfo=timezone.utc),
timestamp_comp=Cmp.GT,
)
query = Query(flt=flt)
assert query.match_document(es4_document)
es4_document.timestamp -= timedelta(microseconds=1)
assert not query.match_document(es4_document)
def test_match_timestamp_lt(es4_document: Es4Document) -> None:
"""Test if the timestamp filter works correctly with the LT comparator"""
flt = QueryFilter(
timestamp=datetime(2022, 5, 5, 8, 21, 11, 668994, tzinfo=timezone.utc),
timestamp_comp=Cmp.LT,
)
query = Query(flt=flt)
assert query.match_document(es4_document)
es4_document.timestamp += timedelta(microseconds=1)
assert not query.match_document(es4_document)
@pytest.mark.parametrize('cmp_usage', ('explicit', 'implicit'))
def test_match_content_length_eq(
cmp_usage: Literal['explicit', 'implicit'],
es4_document: Es4Document,
) -> None:
"""Test if the content_length filter works correctly with the EQ comparator"""
flt = QueryFilter(content_length=4)
if cmp_usage == 'explicit':
flt['content_length_comp'] = Cmp.EQ
query = Query(flt=flt)
assert query.match_document(es4_document)
es4_document.content = 'other'
assert not query.match_document(es4_document)
def test_match_content_length_gt(es4_document: Es4Document) -> None:
"""Test if the content_length filter works correctly with the GT comparator"""
flt = QueryFilter(content_length=4, content_length_comp=Cmp.GT)
query = Query(flt=flt)
assert not query.match_document(es4_document)
es4_document.content = 'other'
assert query.match_document(es4_document)
def test_match_content_length_lt(es4_document: Es4Document) -> None:
"""Test if the content_length filter works correctly with the LT comparator"""
flt = QueryFilter(content_length=4, content_length_comp=Cmp.LT)
query = Query(flt=flt)
assert not query.match_document(es4_document)
es4_document.content = 'see'
assert query.match_document(es4_document)
def test_query_filter_nothing_prediction(es4_document: Es4Document) -> None:
"""Test if the query predicted NOTHING, it returns an empty list"""
query = Query(limit=0)
assert not query({es4_document: {}})
@pytest.mark.parametrize('cmp_usage', ('implicit', 'explicit'))
def test_query_filter_path_sort(
cmp_usage: Literal['implicit', 'explicit'],
es4_document: Es4Document,
identity: Identity,
) -> None:
"""Test if ordering by path works"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
if cmp_usage == 'implicit':
query = Query(order_by_field=OrderByField.PATH)
else:
query = Query(order_by_field=OrderByField.PATH, order_direction=OrderDirection.ASC)
assert query({es4_document: {}, other_document: {}}) == [other_document, es4_document]
def test_query_filter_path_sort_desc(es4_document: Es4Document, identity: Identity) -> None:
"""Test if ordering by path in descending order works"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
query = Query(order_by_field=OrderByField.PATH, order_direction=OrderDirection.DESC)
assert query({other_document: {}, es4_document: {}}) == [es4_document, other_document]
@pytest.mark.parametrize('cmp_usage', ('implicit', 'explicit'))
def test_query_filter_local_index_sort(
cmp_usage: Literal['implicit', 'explicit'],
es4_document: Es4Document,
identity: Identity,
) -> None:
"""Test if ordering by path works"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
if cmp_usage == 'implicit':
query = Query(order_by_field=OrderByField.LOCAL_INDEX)
else:
query = Query(order_by_field=OrderByField.LOCAL_INDEX, order_direction=OrderDirection.ASC)
assert query({es4_document: {'local_index': 2}, other_document: {'local_index': 1}}) == [
other_document,
es4_document,
]
def test_query_filter_local_index_sort_desc(es4_document: Es4Document, identity: Identity) -> None:
"""Test if ordering by path in descending order works"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
query = Query(order_by_field=OrderByField.LOCAL_INDEX, order_direction=OrderDirection.DESC)
assert query({other_document: {'local_index': 1}, es4_document: {'local_index': 2}}) == [
es4_document,
other_document,
]
def test_start_after_path_asc(es4_document: Es4Document, identity: Identity) -> None:
"""Test if start_after_path works with ascending sort"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
query = Query(order_by_field=OrderByField.PATH, start_after_path=Path('/much.txt'))
assert query({other_document: {}, es4_document: {}}) == [es4_document]
def test_start_after_path_desc(es4_document: Es4Document, identity: Identity) -> None:
"""Test if start_after_path works with descending sort"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
query = Query(
order_by_field=OrderByField.PATH,
order_direction=OrderDirection.DESC,
start_after_path=Path('/test.txt'),
)
assert query({other_document: {}, es4_document: {}}) == [other_document]
def test_start_after_local_index_asc(es4_document: Es4Document, identity: Identity) -> None:
"""Test if start_after_local_index works with ascending sort"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
query = Query(order_by_field=OrderByField.LOCAL_INDEX, start_after_local_index=1)
assert query({other_document: {'local_index': 2}, es4_document: {'local_index': 1}}) == [
other_document
]
def test_start_after_local_index_desc(es4_document: Es4Document, identity: Identity) -> None:
"""Test if start_after_path works with descending sort"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
query = Query(
order_by_field=OrderByField.LOCAL_INDEX,
order_direction=OrderDirection.DESC,
start_after_local_index=2,
)
assert query({other_document: {'local_index': 1}, es4_document: {'local_index': 2}}) == [
other_document
]
def test_start_after_with_empty_output() -> None:
"""Test if start_after does not fail with an empty document list"""
query = Query(start_after_path=Path('/test.txt'))
assert query({}) == []
@pytest.mark.parametrize('use_start_after', (True, False), ids=('with_start_after', 'without_start_after'))
def test_limit(use_start_after, es4_document: Es4Document, identity: Identity) -> None:
"""Test if document count limiting works"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'))
if use_start_after:
query = Query(start_after_path=Path('/much.txt'), limit=1)
expected = es4_document
else:
query = Query(limit=1)
expected = other_document
assert query({es4_document: {}, other_document: {}}) == [expected]
def test_byte_limiting_not_reached(es4_document: Es4Document, identity: Identity) -> None:
"""Test if byte limiting doesnt do anything if the sum of document sizes is not reached"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'), content='much')
query = Query(limit_bytes=15)
assert query({es4_document: {}, other_document: {}}) == [other_document, es4_document]
def test_byte_limiting(es4_document: Es4Document, identity: Identity) -> None:
"""Test if byte limiting works as expected"""
other_document = Es4Document(identity, es4_document.share, Path('/much.txt'), content='much')
query = Query(limit_bytes=5)
assert query({es4_document: {}, other_document: {}}) == [other_document]
def test_byte_limiting_with_start_after(es4_document: Es4Document, identity: Identity) -> None:
"""Test if byte limiting works as expected when not starting from the beginning"""
other_document1 = Es4Document(identity, es4_document.share, Path('/much1.txt'), content='much1')
other_document2 = Es4Document(identity, es4_document.share, Path('/much2.txt'), content='much2')
query = Query(limit_bytes=5, start_after_path='/much1.txt')
assert query({es4_document: {}, other_document1: {}, other_document2: {}}) == [other_document2]