Skip to content

Commit 9857fe0

Browse files
committed
Basic implementation of a query planner for /find
1 parent 497a620 commit 9857fe0

File tree

15 files changed

+1010
-132
lines changed

15 files changed

+1010
-132
lines changed

nucliadb/src/nucliadb/search/search/find.py

Lines changed: 240 additions & 112 deletions
Large diffs are not rendered by default.

nucliadb/src/nucliadb/search/search/find_merge.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
from nucliadb.common.external_index_providers.base import TextBlockMatch
3333
from nucliadb.common.ids import ParagraphId, VectorId
3434
from nucliadb.search import SERVICE_NAME, logger
35-
from nucliadb.search.search.cut import cut_page
3635
from nucliadb.search.search.hydrator import (
3736
ResourceHydrationOptions,
3837
TextBlockHydrationOptions,
@@ -41,6 +40,7 @@
4140
text_block_to_find_paragraph,
4241
)
4342
from nucliadb.search.search.merge import merge_relations_results
43+
from nucliadb.search.search.plan.cut import cut_page
4444
from nucliadb.search.search.query_parser.models import UnitRetrieval
4545
from nucliadb.search.search.rank_fusion import IndexSource, RankFusionAlgorithm
4646
from nucliadb.search.search.rerankers import (

nucliadb/src/nucliadb/search/search/merge.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,13 @@
3838
from nucliadb.common.models_utils import from_proto
3939
from nucliadb.common.models_utils.from_proto import RelationTypePbMap
4040
from nucliadb.search.search import cache
41-
from nucliadb.search.search.cut import cut_page
4241
from nucliadb.search.search.fetch import (
4342
fetch_resources,
4443
get_labels_paragraph,
4544
get_labels_resource,
4645
get_seconds_paragraph,
4746
)
47+
from nucliadb.search.search.plan.cut import cut_page
4848
from nucliadb.search.search.query_parser.models import FulltextQuery, UnitRetrieval
4949
from nucliadb_models.common import FieldTypeName
5050
from nucliadb_models.labels import translate_system_to_alias_label

nucliadb/src/nucliadb/search/search/cut.py renamed to nucliadb/src/nucliadb/search/search/plan/__init__.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,3 @@
1717
# You should have received a copy of the GNU Affero General Public License
1818
# along with this program. If not, see <http://www.gnu.org/licenses/>.
1919
#
20-
21-
from typing import TypeVar
22-
23-
T = TypeVar("T")
24-
25-
26-
def cut_page(items: list[T], top_k: int) -> tuple[list[T], bool]:
27-
"""Return a slice of `items` representing the specified page and a boolean
28-
indicating whether there is a next page or not"""
29-
next_page = len(items) > top_k
30-
return items[:top_k], next_page
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Copyright (C) 2021 Bosutech XXI S.L.
2+
#
3+
# nucliadb is offered under the AGPL v3.0 and as commercial software.
4+
# For commercial licensing, contact us at [email protected].
5+
#
6+
# AGPL:
7+
# This program is free software: you can redistribute it and/or modify
8+
# it under the terms of the GNU Affero General Public License as
9+
# published by the Free Software Foundation, either version 3 of the
10+
# License, or (at your option) any later version.
11+
#
12+
# This program is distributed in the hope that it will be useful,
13+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
# GNU Affero General Public License for more details.
16+
#
17+
# You should have received a copy of the GNU Affero General Public License
18+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
#
20+
21+
from typing import TypeVar, Union
22+
23+
from typing_extensions import Self
24+
25+
from nucliadb.common.external_index_providers.base import TextBlockMatch
26+
27+
from .models import ExecutionContext, PlanStep
28+
29+
T = TypeVar("T")
30+
31+
32+
class Cut(PlanStep):
33+
def __init__(self, page: int):
34+
self.page = page
35+
36+
def plan(self, uses: PlanStep[list[TextBlockMatch]]) -> Self:
37+
self.source = uses
38+
return self
39+
40+
async def execute(self, context: ExecutionContext) -> list[TextBlockMatch]:
41+
text_blocks = await self.source.execute(context)
42+
page, next_page = cut_page(text_blocks, self.page)
43+
context.next_page = next_page
44+
return page
45+
46+
def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
47+
return {
48+
"Cut": self.source.explain(),
49+
}
50+
51+
52+
def cut_page(items: list[T], top_k: int) -> tuple[list[T], bool]:
53+
"""Return a slice of `items` representing the specified page and a boolean
54+
indicating whether there is a next page or not"""
55+
next_page = len(items) > top_k
56+
return items[:top_k], next_page
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
# Copyright (C) 2021 Bosutech XXI S.L.
2+
#
3+
# nucliadb is offered under the AGPL v3.0 and as commercial software.
4+
# For commercial licensing, contact us at [email protected].
5+
#
6+
# AGPL:
7+
# This program is free software: you can redistribute it and/or modify
8+
# it under the terms of the GNU Affero General Public License as
9+
# published by the Free Software Foundation, either version 3 of the
10+
# License, or (at your option) any later version.
11+
#
12+
# This program is distributed in the hope that it will be useful,
13+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
# GNU Affero General Public License for more details.
16+
#
17+
# You should have received a copy of the GNU Affero General Public License
18+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
#
20+
import asyncio
21+
from typing import Union
22+
23+
from typing_extensions import Self
24+
25+
from nucliadb.common.external_index_providers.base import TextBlockMatch
26+
from nucliadb.search import SERVICE_NAME
27+
from nucliadb.search.search.hydrator import (
28+
ResourceHydrationOptions,
29+
TextBlockHydrationOptions,
30+
hydrate_resource_metadata,
31+
hydrate_text_block,
32+
)
33+
from nucliadb_models.resource import Resource
34+
35+
from .models import ExecutionContext, PlanStep
36+
37+
38+
class HydrateTextBlocks(PlanStep):
39+
def __init__(
40+
self,
41+
kbid: str,
42+
hydration_options: TextBlockHydrationOptions,
43+
max_ops: asyncio.Semaphore,
44+
):
45+
self.kbid = kbid
46+
self.hydration_options = hydration_options
47+
self.max_ops = max_ops
48+
49+
def plan(self, uses: PlanStep[list[TextBlockMatch]]) -> Self:
50+
self.source = uses
51+
return self
52+
53+
async def execute(self, context: ExecutionContext) -> list[TextBlockMatch]:
54+
text_blocks = await self.source.execute(context)
55+
56+
ops = []
57+
for text_block in text_blocks:
58+
ops.append(
59+
asyncio.create_task(
60+
hydrate_text_block(
61+
self.kbid,
62+
text_block,
63+
self.hydration_options,
64+
concurrency_control=self.max_ops,
65+
)
66+
)
67+
)
68+
69+
# TODO: metrics
70+
hydrated = await asyncio.gather(*ops)
71+
return hydrated
72+
73+
def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
74+
return {
75+
"HydrateTextBlocks": self.source.explain(),
76+
}
77+
78+
79+
class HydrateResources(PlanStep):
80+
def __init__(
81+
self, kbid: str, hydration_options: ResourceHydrationOptions, max_ops: asyncio.Semaphore
82+
):
83+
self.kbid = kbid
84+
self.hydration_options = hydration_options
85+
self.max_ops = max_ops
86+
87+
def plan(self, uses: PlanStep[list[TextBlockMatch]]) -> Self:
88+
self.source = uses
89+
return self
90+
91+
async def execute(self, context: ExecutionContext) -> list[Resource]:
92+
text_blocks = await self.source.execute(context)
93+
94+
ops = {}
95+
for text_block in text_blocks:
96+
rid = text_block.paragraph_id.rid
97+
98+
if rid not in ops:
99+
ops[rid] = asyncio.create_task(
100+
hydrate_resource_metadata(
101+
self.kbid,
102+
rid,
103+
options=self.hydration_options,
104+
concurrency_control=self.max_ops,
105+
service_name=SERVICE_NAME,
106+
)
107+
)
108+
# TODO: metrics
109+
hydrated = await asyncio.gather(*ops.values())
110+
resources = [resource for resource in hydrated if resource is not None]
111+
return resources
112+
113+
def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
114+
return {
115+
"HydrateResources": self.source.explain(),
116+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Copyright (C) 2021 Bosutech XXI S.L.
2+
#
3+
# nucliadb is offered under the AGPL v3.0 and as commercial software.
4+
# For commercial licensing, contact us at [email protected].
5+
#
6+
# AGPL:
7+
# This program is free software: you can redistribute it and/or modify
8+
# it under the terms of the GNU Affero General Public License as
9+
# published by the Free Software Foundation, either version 3 of the
10+
# License, or (at your option) any later version.
11+
#
12+
# This program is distributed in the hope that it will be useful,
13+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
# GNU Affero General Public License for more details.
16+
#
17+
# You should have received a copy of the GNU Affero General Public License
18+
# along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
#
20+
from abc import ABC, abstractmethod
21+
from dataclasses import dataclass
22+
from typing import Generic, Optional, TypeVar, Union
23+
24+
from nucliadb.common.external_index_providers.base import TextBlockMatch
25+
from nucliadb_models.resource import Resource
26+
27+
# execution context (filled while running the plan)
28+
29+
30+
@dataclass
31+
class ExecutionContext:
32+
# nidx search failed on some shards and results aren't from the whole corpus
33+
nidx_incomplete: bool = False
34+
35+
# nidx shards queried during retrieval
36+
nidx_queried_shards: Optional[list[str]] = None
37+
38+
# whether a query with a greater top_k would have returned more results
39+
next_page: bool = False
40+
41+
# number of keyword results obtained from the index
42+
keyword_result_count: int = 0
43+
44+
# list of exact matches during keyword search
45+
keyword_exact_matches: Optional[list[str]] = None
46+
47+
48+
# query plan step outputs
49+
50+
51+
@dataclass
52+
class IndexResult:
53+
keyword: list[TextBlockMatch]
54+
semantic: list[TextBlockMatch]
55+
graph: list[TextBlockMatch]
56+
57+
58+
@dataclass
59+
class BestMatchesHydrated:
60+
best_text_blocks: list[TextBlockMatch]
61+
resources: list[Resource]
62+
best_matches: list[str]
63+
64+
65+
# query plan step logic
66+
67+
68+
T = TypeVar("T")
69+
70+
71+
class PlanStep(ABC, Generic[T]):
72+
@abstractmethod
73+
async def execute(self, context: ExecutionContext) -> T:
74+
pass
75+
76+
@abstractmethod
77+
def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
78+
return {self.__class__.__name__: "(explain not implemented)"}
79+
80+
81+
class Plan(Generic[T]):
82+
def __init__(self, step: PlanStep[T]):
83+
self._context = ExecutionContext()
84+
self.step = step
85+
86+
async def execute(self) -> T:
87+
return await self.step.execute(self._context)
88+
89+
def explain(self) -> None:
90+
def _print_plan(plan: Union[str, Union[str, dict]], offset: int = 0):
91+
if isinstance(plan, dict):
92+
for key, value in plan.items():
93+
print(" " * offset, "-", key)
94+
_print_plan(value, offset + 2)
95+
else:
96+
print(" " * offset, "-", plan)
97+
98+
plan = self.step.explain()
99+
_print_plan(plan)
100+
101+
@property
102+
def context(self) -> ExecutionContext:
103+
return self._context

0 commit comments

Comments
 (0)