Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Evaluated
* [PUFFINN](https://github.com/puffinn/puffinn) ![https://img.shields.io/github/stars/puffinn/puffinn?style=social](https://img.shields.io/github/stars/puffinn/puffinn?style=social)
* [N2](https://github.com/kakao/n2) ![https://img.shields.io/github/stars/kakao/n2?style=social](https://img.shields.io/github/stars/kakao/n2?style=social)
* [ScaNN](https://github.com/google-research/google-research/tree/master/scann)
* [Elasticsearch](https://github.com/elastic/elasticsearch) ![https://img.shields.io/github/stars/elastic/elasticsearch?style=social](https://img.shields.io/github/stars/elastic/elasticsearch?style=social): HNSW
* [Elastiknn](https://github.com/alexklibisz/elastiknn) ![https://img.shields.io/github/stars/alexklibisz/elastiknn?style=social](https://img.shields.io/github/stars/alexklibisz/elastiknn?style=social)
* [OpenSearch KNN](https://github.com/opensearch-project/k-NN) ![https://img.shields.io/github/stars/opensearch-project/k-NN?style=social](https://img.shields.io/github/stars/opensearch-project/k-NN?style=social)
* [DiskANN](https://github.com/microsoft/diskann) ![https://img.shields.io/github/stars/microsoft/diskann?style=social](https://img.shields.io/github/stars/microsoft/diskann?style=social): Vamana, Vamana-PQ
Expand Down
20 changes: 12 additions & 8 deletions algos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -824,11 +824,13 @@ float:
elasticsearch:
docker-tag: ann-benchmarks-elasticsearch
module: ann_benchmarks.algorithms.elasticsearch
constructor: ElasticsearchScriptScoreQuery
base-args: [ "@metric", "@dimension" ]
constructor: ElasticsearchKNN
base-args: ["@metric", "@dimension"]
run-groups:
empty:
args: []
m-16-ef-100:
arg-groups:
- {"m": 16, "ef_construction": 100} # index_options
query-args: [[10, 20, 40, 80, 160]] # num_candidates
elastiknn-l2lsh:
docker-tag: ann-benchmarks-elastiknn
module: ann_benchmarks.algorithms.elastiknn
Expand Down Expand Up @@ -1143,11 +1145,13 @@ float:
elasticsearch:
docker-tag: ann-benchmarks-elasticsearch
module: ann_benchmarks.algorithms.elasticsearch
constructor: ElasticsearchScriptScoreQuery
base-args: [ "@metric", "@dimension" ]
constructor: ElasticsearchKNN
base-args: ["@metric", "@dimension"]
run-groups:
empty:
args: []
m-16-ef-100:
arg-groups:
- {"m": 16, "ef_construction": 100} # index_options
query-args: [[10, 20, 40, 80, 160]] # num_candidates
opensearchknn:
docker-tag: ann-benchmarks-opensearchknn
module: ann_benchmarks.algorithms.opensearchknn
Expand Down
155 changes: 88 additions & 67 deletions ann_benchmarks/algorithms/elasticsearch.py
Original file line number Diff line number Diff line change
@@ -1,99 +1,120 @@
"""
ann-benchmarks interfaces for Elasticsearch.
Note that this requires X-Pack, which is not included in the OSS version of Elasticsearch.
"""
import logging
from time import sleep
from urllib.error import URLError
from urllib.request import Request, urlopen

from elasticsearch import Elasticsearch
from elasticsearch import ConnectionError, Elasticsearch
from elasticsearch.helpers import bulk

from .base import BaseANN

# Configure the elasticsearch logger.
# By default, it writes an INFO statement for every request.
logging.getLogger("elasticsearch").setLevel(logging.WARN)

# Uncomment these lines if you want to see timing for every HTTP request and its duration.
# logging.basicConfig(level=logging.INFO)
# logging.getLogger("elasticsearch").setLevel(logging.INFO)
class ElasticsearchKNN(BaseANN):
"""Elasticsearch KNN search.


def es_wait():
print("Waiting for elasticsearch health endpoint...")
req = Request("http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=1s")
for i in range(30):
try:
res = urlopen(req)
if res.getcode() == 200:
print("Elasticsearch is ready")
return
except URLError:
pass
sleep(1)
raise RuntimeError("Failed to connect to local elasticsearch")


class ElasticsearchScriptScoreQuery(BaseANN):
"""
KNN using the Elasticsearch dense_vector datatype and script score functions.
- Dense vector field type: https://www.elastic.co/guide/en/elasticsearch/reference/master/dense-vector.html
- Dense vector queries: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-script-score-query.html
See https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html for more details.
"""

def __init__(self, metric: str, dimension: int):
self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}"
def __init__(self, metric: str, dimension: int, index_options: dict):
self.metric = metric
self.dimension = dimension
self.index = f"es-ssq-{metric}-{dimension}"
self.es = Elasticsearch(["http://localhost:9200"])
self.index_options = index_options
self.num_candidates = 100

index_options_str = "-".join(sorted(f"{k}-{v}" for k, v in self.index_options.items()))
self.name = f"es-{metric}-{dimension}-{index_options_str}"
self.similarity_metric = self._vector_similarity_metric(metric)

self.client = Elasticsearch(["http://localhost:9200"])
self.batch_res = []
if self.metric == "euclidean":
self.script = '1 / (1 + l2norm(params.query_vec, "vec"))'
elif self.metric == "angular":
self.script = '1.0 + cosineSimilarity(params.query_vec, "vec")'
else:
raise NotImplementedError(f"Not implemented for metric {self.metric}")
es_wait()
self._wait_for_health_status()

def _vector_similarity_metric(self, metric: str):
# `dot_product` is more efficient than `cosine`, but requires all vectors to be normalized
# to unit length. We opt for adaptability, some datasets might not be normalized.
supported_metrics = {
"angular": "cosine",
"euclidean": "l2_norm",
}
if metric not in supported_metrics:
raise NotImplementedError(f"{metric} is not implemented")
return supported_metrics[metric]

def _wait_for_health_status(self, wait_seconds=30, status="yellow"):
print("Waiting for Elasticsearch ...")
for _ in range(wait_seconds):
try:
health = self.client.cluster.health(wait_for_status=status, request_timeout=1)
print(f'Elasticsearch is ready: status={health["status"]}')
return
except ConnectionError:
pass
sleep(1)
raise RuntimeError("Failed to connect to Elasticsearch")

def fit(self, X):
body = dict(settings=dict(number_of_shards=1, number_of_replicas=0))
mapping = dict(
properties=dict(id=dict(type="keyword", store=True), vec=dict(type="dense_vector", dims=self.dimension))
)
self.es.indices.create(self.index, body=body)
self.es.indices.put_mapping(mapping, self.index)
settings = {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": -1,
}
mappings = {
"properties": {
"id": {"type": "keyword", "store": True},
"vec": {
"type": "dense_vector",
"element_type": "float",
"dims": self.dimension,
"index": True,
"similarity": self.similarity_metric,
"index_options": {
"type": self.index_options.get("type", "hnsw"),
"m": self.index_options["m"],
"ef_construction": self.index_options["ef_construction"],
},
},
},
}
self.client.indices.create(index=self.name, settings=settings, mappings=mappings)

def gen():
for i, vec in enumerate(X):
yield {"_op_type": "index", "_index": self.index, "vec": vec.tolist(), "id": str(i + 1)}
yield {"_op_type": "index", "_index": self.name, "id": str(i), "vec": vec.tolist()}

(_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9)
assert len(errors) == 0, errors
print("Indexing ...")
(_, errors) = bulk(self.client, gen(), chunk_size=500, request_timeout=90)
if len(errors) != 0:
raise RuntimeError("Failed to index documents")

self.es.indices.refresh(self.index)
self.es.indices.forcemerge(self.index, max_num_segments=1)
print("Force merge index ...")
self.client.indices.forcemerge(index=self.name, max_num_segments=1, request_timeout=900)

print("Refreshing index ...")
self.client.indices.refresh(index=self.name, request_timeout=900)

def set_query_arguments(self, num_candidates):
self.num_candidates = num_candidates

def query(self, q, n):
body = dict(
query=dict(
script_score=dict(
query=dict(match_all=dict()), script=dict(source=self.script, params=dict(query_vec=q.tolist()))
)
)
)
res = self.es.search(
index=self.index,
if n > self.num_candidates:
raise ValueError("n must be smaller than num_candidates")

body = {
"knn": {
"field": "vec",
"query_vector": q.tolist(),
"k": n,
"num_candidates": self.num_candidates,
}
}
res = self.client.search(
index=self.name,
body=body,
size=n,
_source=False,
docvalue_fields=["id"],
stored_fields="_none_",
filter_path=["hits.hits.fields.id"],
request_timeout=10,
)
return [int(h["fields"]["id"][0]) - 1 for h in res["hits"]["hits"]]
return [int(h["fields"]["id"][0]) for h in res["hits"]["hits"]]

def batch_query(self, X, n):
self.batch_res = [self.query(q, n) for q in X]
Expand Down
77 changes: 52 additions & 25 deletions install/Dockerfile.elasticsearch
Original file line number Diff line number Diff line change
@@ -1,45 +1,72 @@
FROM ann-benchmarks
FROM ann-benchmarks AS builder
ARG ELASTICSEARCH_VERSION=8.7.0

WORKDIR /home/app
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get install -y curl

# Download Elasticsearch to intermediate builder.
WORKDIR /tmp
RUN curl -OsS https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}-linux-$(arch).tar.gz
RUN curl -sS https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}-linux-$(arch).tar.gz.sha512 | shasum -a 512 -c -

WORKDIR /usr/share/elasticsearch
RUN tar -zxf /tmp/elasticsearch-${ELASTICSEARCH_VERSION}-linux-$(arch).tar.gz --strip-components=1

# Install Elasticsearch in final image:
# - https://www.elastic.co/guide/en/elasticsearch/reference/current/targz.html
# - https://www.elastic.co/guide/en/elasticsearch/reference/current/system-config.html
FROM ann-benchmarks
ARG ELASTICSEARCH_VERSION=8.7.0

# Install elasticsearch.
ENV DEBIAN_FRONTEND noninteractive
RUN apt install -y wget curl htop
RUN wget --quiet https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.2-amd64.deb \
&& dpkg -i elasticsearch-7.9.2-amd64.deb \
&& rm elasticsearch-7.9.2-amd64.deb
RUN apt-get install -y curl

WORKDIR /usr/share/elasticsearch

# Install python client.
RUN python3 -m pip install --upgrade elasticsearch==7.9.1
# Create elasticsearch user and user group.
RUN groupadd -g 1000 elasticsearch
RUN adduser --uid 1000 --gid 1000 --home /usr/share/elasticsearch elasticsearch

# Configure elasticsearch and JVM for single-node, single-core.
COPY --from=builder --chown=elasticsearch:elasticsearch /usr/share/elasticsearch /usr/share/elasticsearch

RUN echo "vm.max_map_count=262144" >> /etc/sysctl.conf

# Backup original configurations for potential future reference.
RUN cp config/elasticsearch.yml config/elasticsearch.yml.bak
RUN cp config/jvm.options config/jvm.options.bak

# Configure Elasticsearch for single-node, single-core.
RUN echo '\
discovery.type: single-node\n\
network.host: 0.0.0.0\n\
node.master: true\n\
node.data: true\n\
node.roles: [master, data]\n\
node.processors: 1\n\
path.data: /usr/share/elasticsearch/data\n\
path.logs: /usr/share/elasticsearch/logs\n\
bootstrap.memory_lock: true\n\
thread_pool.write.size: 1\n\
thread_pool.search.size: 1\n\
thread_pool.search.queue_size: 1\n\
path.data: /var/lib/elasticsearch\n\
path.logs: /var/log/elasticsearch\n\
' > /etc/elasticsearch/elasticsearch.yml
xpack.security.enabled: false\n\
' > config/elasticsearch.yml

RUN echo '\
-Xms3G\n\
-Xmx3G\n\
-XX:+UseG1GC\n\
-XX:G1ReservePercent=25\n\
-XX:InitiatingHeapOccupancyPercent=30\n\
-XX:+HeapDumpOnOutOfMemoryError\n\
-XX:HeapDumpPath=/var/lib/elasticsearch\n\
-XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log\n\
-Xlog:gc*,gc+age=trace,safepoint:file=/var/log/elasticsearch/gc.log:utctime,pid,tags:filecount=32,filesize=64m' > /etc/elasticsearch/jvm.options
-XX:HeapDumpPath=data\n\
-XX:ErrorFile=/usr/share/elasticsearch/logs/hs_err_pid%p.log\n\
-Xlog:gc*,gc+age=trace,safepoint:file=/usr/share/elasticsearch/logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m\n\
' > config/jvm.options

# Make sure you can start the service.
RUN service elasticsearch start && service elasticsearch stop
RUN chown -R elasticsearch:elasticsearch /usr/share/elasticsearch

WORKDIR /home/app

RUN python3 -m pip install elasticsearch==${ELASTICSEARCH_VERSION}

# Custom entrypoint that also starts the Elasticsearch server.
RUN echo 'service elasticsearch start && python3 -u run_algorithm.py "$@"' > entrypoint.sh
RUN echo 'set -eux' >> entrypoint.sh
RUN echo 'su - elasticsearch -c "nohup /usr/share/elasticsearch/bin/elasticsearch > nohup.out 2>&1 &"' >> entrypoint.sh
RUN echo 'python3 -u run_algorithm.py "$@"' >> entrypoint.sh

ENTRYPOINT ["/bin/bash", "/home/app/entrypoint.sh"]