Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
352 changes: 240 additions & 112 deletions nucliadb/src/nucliadb/search/search/find.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/search/search/find_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
from nucliadb.common.external_index_providers.base import TextBlockMatch
from nucliadb.common.ids import ParagraphId, VectorId
from nucliadb.search import SERVICE_NAME, logger
from nucliadb.search.search.cut import cut_page
from nucliadb.search.search.hydrator import (
ResourceHydrationOptions,
TextBlockHydrationOptions,
Expand All @@ -41,6 +40,7 @@
text_block_to_find_paragraph,
)
from nucliadb.search.search.merge import merge_relations_results
from nucliadb.search.search.plan.cut import cut_page
from nucliadb.search.search.query_parser.models import UnitRetrieval
from nucliadb.search.search.rank_fusion import IndexSource, RankFusionAlgorithm
from nucliadb.search.search.rerankers import (
Expand Down
2 changes: 1 addition & 1 deletion nucliadb/src/nucliadb/search/search/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@
from nucliadb.common.models_utils import from_proto
from nucliadb.common.models_utils.from_proto import RelationTypePbMap
from nucliadb.search.search import cache
from nucliadb.search.search.cut import cut_page
from nucliadb.search.search.fetch import (
fetch_resources,
get_labels_paragraph,
get_labels_resource,
get_seconds_paragraph,
)
from nucliadb.search.search.plan.cut import cut_page
from nucliadb.search.search.query_parser.models import FulltextQuery, UnitRetrieval
from nucliadb_models.common import FieldTypeName
from nucliadb_models.labels import translate_system_to_alias_label
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,3 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

from typing import TypeVar

T = TypeVar("T")


def cut_page(items: list[T], top_k: int) -> tuple[list[T], bool]:
"""Return a slice of `items` representing the specified page and a boolean
indicating whether there is a next page or not"""
next_page = len(items) > top_k
return items[:top_k], next_page
56 changes: 56 additions & 0 deletions nucliadb/src/nucliadb/search/search/plan/cut.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#

from typing import TypeVar, Union

from typing_extensions import Self

from nucliadb.common.external_index_providers.base import TextBlockMatch

from .models import ExecutionContext, PlanStep

T = TypeVar("T")


class Cut(PlanStep):
def __init__(self, page: int):
self.page = page

def plan(self, uses: PlanStep[list[TextBlockMatch]]) -> Self:
self.source = uses
return self

async def execute(self, context: ExecutionContext) -> list[TextBlockMatch]:
text_blocks = await self.source.execute(context)
page, next_page = cut_page(text_blocks, self.page)
context.next_page = next_page
return page

def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
return {
"Cut": self.source.explain(),
}


def cut_page(items: list[T], top_k: int) -> tuple[list[T], bool]:
"""Return a slice of `items` representing the specified page and a boolean
indicating whether there is a next page or not"""
next_page = len(items) > top_k
return items[:top_k], next_page
116 changes: 116 additions & 0 deletions nucliadb/src/nucliadb/search/search/plan/hydrate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import asyncio
from typing import Union

from typing_extensions import Self

from nucliadb.common.external_index_providers.base import TextBlockMatch
from nucliadb.search import SERVICE_NAME
from nucliadb.search.search.hydrator import (
ResourceHydrationOptions,
TextBlockHydrationOptions,
hydrate_resource_metadata,
hydrate_text_block,
)
from nucliadb_models.resource import Resource

from .models import ExecutionContext, PlanStep


class HydrateTextBlocks(PlanStep):
def __init__(
self,
kbid: str,
hydration_options: TextBlockHydrationOptions,
max_ops: asyncio.Semaphore,
):
self.kbid = kbid
self.hydration_options = hydration_options
self.max_ops = max_ops

def plan(self, uses: PlanStep[list[TextBlockMatch]]) -> Self:
self.source = uses
return self

async def execute(self, context: ExecutionContext) -> list[TextBlockMatch]:
text_blocks = await self.source.execute(context)

ops = []
for text_block in text_blocks:
ops.append(
asyncio.create_task(
hydrate_text_block(
self.kbid,
text_block,
self.hydration_options,
concurrency_control=self.max_ops,
)
)
)

# TODO: metrics
hydrated = await asyncio.gather(*ops)
return hydrated

def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
return {
"HydrateTextBlocks": self.source.explain(),
}


class HydrateResources(PlanStep):
def __init__(
self, kbid: str, hydration_options: ResourceHydrationOptions, max_ops: asyncio.Semaphore
):
self.kbid = kbid
self.hydration_options = hydration_options
self.max_ops = max_ops

def plan(self, uses: PlanStep[list[TextBlockMatch]]) -> Self:
self.source = uses
return self

async def execute(self, context: ExecutionContext) -> list[Resource]:
text_blocks = await self.source.execute(context)

ops = {}
for text_block in text_blocks:
rid = text_block.paragraph_id.rid

if rid not in ops:
ops[rid] = asyncio.create_task(
hydrate_resource_metadata(
self.kbid,
rid,
options=self.hydration_options,
concurrency_control=self.max_ops,
service_name=SERVICE_NAME,
)
)
# TODO: metrics
hydrated = await asyncio.gather(*ops.values())
resources = [resource for resource in hydrated if resource is not None]
return resources

def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
return {
"HydrateResources": self.source.explain(),
}
103 changes: 103 additions & 0 deletions nucliadb/src/nucliadb/search/search/plan/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright (C) 2021 Bosutech XXI S.L.
#
# nucliadb is offered under the AGPL v3.0 and as commercial software.
# For commercial licensing, contact us at [email protected].
#
# AGPL:
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Generic, Optional, TypeVar, Union

from nucliadb.common.external_index_providers.base import TextBlockMatch
from nucliadb_models.resource import Resource

# execution context (filled while running the plan)


@dataclass
class ExecutionContext:
# nidx search failed on some shards and results aren't from the whole corpus
nidx_incomplete: bool = False

# nidx shards queried during retrieval
nidx_queried_shards: Optional[list[str]] = None

# whether a query with a greater top_k would have returned more results
next_page: bool = False

# number of keyword results obtained from the index
keyword_result_count: int = 0

# list of exact matches during keyword search
keyword_exact_matches: Optional[list[str]] = None


# query plan step outputs


@dataclass
class IndexResult:
keyword: list[TextBlockMatch]
semantic: list[TextBlockMatch]
graph: list[TextBlockMatch]


@dataclass
class BestMatchesHydrated:
best_text_blocks: list[TextBlockMatch]
resources: list[Resource]
best_matches: list[str]


# query plan step logic


T = TypeVar("T")


class PlanStep(ABC, Generic[T]):
@abstractmethod
async def execute(self, context: ExecutionContext) -> T:
pass

@abstractmethod
def explain(self) -> Union[str, dict[str, Union[str, dict]]]:
return {self.__class__.__name__: "(explain not implemented)"}


class Plan(Generic[T]):
def __init__(self, step: PlanStep[T]):
self._context = ExecutionContext()
self.step = step

async def execute(self) -> T:
return await self.step.execute(self._context)

def explain(self) -> None:
def _print_plan(plan: Union[str, Union[str, dict]], offset: int = 0):
if isinstance(plan, dict):
for key, value in plan.items():
print(" " * offset, "-", key)
_print_plan(value, offset + 2)
else:
print(" " * offset, "-", plan)

plan = self.step.explain()
_print_plan(plan)

@property
def context(self) -> ExecutionContext:
return self._context
Loading
Loading