diff --git a/README.md b/README.md index 993e835b9..af4e981fd 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ Evaluated * [PUFFINN](https://github.com/puffinn/puffinn) ![https://img.shields.io/github/stars/puffinn/puffinn?style=social](https://img.shields.io/github/stars/puffinn/puffinn?style=social) * [N2](https://github.com/kakao/n2) ![https://img.shields.io/github/stars/kakao/n2?style=social](https://img.shields.io/github/stars/kakao/n2?style=social) * [ScaNN](https://github.com/google-research/google-research/tree/master/scann) +* [Elasticsearch](https://github.com/elastic/elasticsearch) ![https://img.shields.io/github/stars/elastic/elasticsearch?style=social](https://img.shields.io/github/stars/elastic/elasticsearch?style=social): HNSW * [Elastiknn](https://github.com/alexklibisz/elastiknn) ![https://img.shields.io/github/stars/alexklibisz/elastiknn?style=social](https://img.shields.io/github/stars/alexklibisz/elastiknn?style=social) * [OpenSearch KNN](https://github.com/opensearch-project/k-NN) ![https://img.shields.io/github/stars/opensearch-project/k-NN?style=social](https://img.shields.io/github/stars/opensearch-project/k-NN?style=social) * [DiskANN](https://github.com/microsoft/diskann) ![https://img.shields.io/github/stars/microsoft/diskann?style=social](https://img.shields.io/github/stars/microsoft/diskann?style=social): Vamana, Vamana-PQ diff --git a/algos.yaml b/algos.yaml index 317de7a05..1d823a1a2 100644 --- a/algos.yaml +++ b/algos.yaml @@ -824,11 +824,13 @@ float: elasticsearch: docker-tag: ann-benchmarks-elasticsearch module: ann_benchmarks.algorithms.elasticsearch - constructor: ElasticsearchScriptScoreQuery - base-args: [ "@metric", "@dimension" ] + constructor: ElasticsearchKNN + base-args: ["@metric", "@dimension"] run-groups: - empty: - args: [] + m-16-ef-100: + arg-groups: + - {"m": 16, "ef_construction": 100} # index_options + query-args: [[10, 20, 40, 80, 160]] # num_candidates elastiknn-l2lsh: docker-tag: ann-benchmarks-elastiknn module: ann_benchmarks.algorithms.elastiknn @@ -1143,11 +1145,13 @@ float: elasticsearch: docker-tag: ann-benchmarks-elasticsearch module: ann_benchmarks.algorithms.elasticsearch - constructor: ElasticsearchScriptScoreQuery - base-args: [ "@metric", "@dimension" ] + constructor: ElasticsearchKNN + base-args: ["@metric", "@dimension"] run-groups: - empty: - args: [] + m-16-ef-100: + arg-groups: + - {"m": 16, "ef_construction": 100} # index_options + query-args: [[10, 20, 40, 80, 160]] # num_candidates opensearchknn: docker-tag: ann-benchmarks-opensearchknn module: ann_benchmarks.algorithms.opensearchknn diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 56d6f7f8a..e1893b85a 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -1,99 +1,120 @@ -""" -ann-benchmarks interfaces for Elasticsearch. -Note that this requires X-Pack, which is not included in the OSS version of Elasticsearch. -""" -import logging from time import sleep -from urllib.error import URLError -from urllib.request import Request, urlopen -from elasticsearch import Elasticsearch +from elasticsearch import ConnectionError, Elasticsearch from elasticsearch.helpers import bulk from .base import BaseANN -# Configure the elasticsearch logger. -# By default, it writes an INFO statement for every request. -logging.getLogger("elasticsearch").setLevel(logging.WARN) -# Uncomment these lines if you want to see timing for every HTTP request and its duration. -# logging.basicConfig(level=logging.INFO) -# logging.getLogger("elasticsearch").setLevel(logging.INFO) +class ElasticsearchKNN(BaseANN): + """Elasticsearch KNN search. - -def es_wait(): - print("Waiting for elasticsearch health endpoint...") - req = Request("http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=1s") - for i in range(30): - try: - res = urlopen(req) - if res.getcode() == 200: - print("Elasticsearch is ready") - return - except URLError: - pass - sleep(1) - raise RuntimeError("Failed to connect to local elasticsearch") - - -class ElasticsearchScriptScoreQuery(BaseANN): - """ - KNN using the Elasticsearch dense_vector datatype and script score functions. - - Dense vector field type: https://www.elastic.co/guide/en/elasticsearch/reference/master/dense-vector.html - - Dense vector queries: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-script-score-query.html + See https://www.elastic.co/guide/en/elasticsearch/reference/current/knn-search.html for more details. """ - def __init__(self, metric: str, dimension: int): - self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}" + def __init__(self, metric: str, dimension: int, index_options: dict): self.metric = metric self.dimension = dimension - self.index = f"es-ssq-{metric}-{dimension}" - self.es = Elasticsearch(["http://localhost:9200"]) + self.index_options = index_options + self.num_candidates = 100 + + index_options_str = "-".join(sorted(f"{k}-{v}" for k, v in self.index_options.items())) + self.name = f"es-{metric}-{dimension}-{index_options_str}" + self.similarity_metric = self._vector_similarity_metric(metric) + + self.client = Elasticsearch(["http://localhost:9200"]) self.batch_res = [] - if self.metric == "euclidean": - self.script = '1 / (1 + l2norm(params.query_vec, "vec"))' - elif self.metric == "angular": - self.script = '1.0 + cosineSimilarity(params.query_vec, "vec")' - else: - raise NotImplementedError(f"Not implemented for metric {self.metric}") - es_wait() + self._wait_for_health_status() + + def _vector_similarity_metric(self, metric: str): + # `dot_product` is more efficient than `cosine`, but requires all vectors to be normalized + # to unit length. We opt for adaptability, some datasets might not be normalized. + supported_metrics = { + "angular": "cosine", + "euclidean": "l2_norm", + } + if metric not in supported_metrics: + raise NotImplementedError(f"{metric} is not implemented") + return supported_metrics[metric] + + def _wait_for_health_status(self, wait_seconds=30, status="yellow"): + print("Waiting for Elasticsearch ...") + for _ in range(wait_seconds): + try: + health = self.client.cluster.health(wait_for_status=status, request_timeout=1) + print(f'Elasticsearch is ready: status={health["status"]}') + return + except ConnectionError: + pass + sleep(1) + raise RuntimeError("Failed to connect to Elasticsearch") def fit(self, X): - body = dict(settings=dict(number_of_shards=1, number_of_replicas=0)) - mapping = dict( - properties=dict(id=dict(type="keyword", store=True), vec=dict(type="dense_vector", dims=self.dimension)) - ) - self.es.indices.create(self.index, body=body) - self.es.indices.put_mapping(mapping, self.index) + settings = { + "number_of_shards": 1, + "number_of_replicas": 0, + "refresh_interval": -1, + } + mappings = { + "properties": { + "id": {"type": "keyword", "store": True}, + "vec": { + "type": "dense_vector", + "element_type": "float", + "dims": self.dimension, + "index": True, + "similarity": self.similarity_metric, + "index_options": { + "type": self.index_options.get("type", "hnsw"), + "m": self.index_options["m"], + "ef_construction": self.index_options["ef_construction"], + }, + }, + }, + } + self.client.indices.create(index=self.name, settings=settings, mappings=mappings) def gen(): for i, vec in enumerate(X): - yield {"_op_type": "index", "_index": self.index, "vec": vec.tolist(), "id": str(i + 1)} + yield {"_op_type": "index", "_index": self.name, "id": str(i), "vec": vec.tolist()} - (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) - assert len(errors) == 0, errors + print("Indexing ...") + (_, errors) = bulk(self.client, gen(), chunk_size=500, request_timeout=90) + if len(errors) != 0: + raise RuntimeError("Failed to index documents") - self.es.indices.refresh(self.index) - self.es.indices.forcemerge(self.index, max_num_segments=1) + print("Force merge index ...") + self.client.indices.forcemerge(index=self.name, max_num_segments=1, request_timeout=900) + + print("Refreshing index ...") + self.client.indices.refresh(index=self.name, request_timeout=900) + + def set_query_arguments(self, num_candidates): + self.num_candidates = num_candidates def query(self, q, n): - body = dict( - query=dict( - script_score=dict( - query=dict(match_all=dict()), script=dict(source=self.script, params=dict(query_vec=q.tolist())) - ) - ) - ) - res = self.es.search( - index=self.index, + if n > self.num_candidates: + raise ValueError("n must be smaller than num_candidates") + + body = { + "knn": { + "field": "vec", + "query_vector": q.tolist(), + "k": n, + "num_candidates": self.num_candidates, + } + } + res = self.client.search( + index=self.name, body=body, size=n, _source=False, docvalue_fields=["id"], stored_fields="_none_", filter_path=["hits.hits.fields.id"], + request_timeout=10, ) - return [int(h["fields"]["id"][0]) - 1 for h in res["hits"]["hits"]] + return [int(h["fields"]["id"][0]) for h in res["hits"]["hits"]] def batch_query(self, X, n): self.batch_res = [self.query(q, n) for q in X] diff --git a/install/Dockerfile.elasticsearch b/install/Dockerfile.elasticsearch index ed829a7af..c5a83b488 100644 --- a/install/Dockerfile.elasticsearch +++ b/install/Dockerfile.elasticsearch @@ -1,45 +1,72 @@ -FROM ann-benchmarks +FROM ann-benchmarks AS builder +ARG ELASTICSEARCH_VERSION=8.7.0 -WORKDIR /home/app +ENV DEBIAN_FRONTEND noninteractive +RUN apt-get install -y curl + +# Download Elasticsearch to intermediate builder. +WORKDIR /tmp +RUN curl -OsS https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}-linux-$(arch).tar.gz +RUN curl -sS https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-${ELASTICSEARCH_VERSION}-linux-$(arch).tar.gz.sha512 | shasum -a 512 -c - + +WORKDIR /usr/share/elasticsearch +RUN tar -zxf /tmp/elasticsearch-${ELASTICSEARCH_VERSION}-linux-$(arch).tar.gz --strip-components=1 + +# Install Elasticsearch in final image: +# - https://www.elastic.co/guide/en/elasticsearch/reference/current/targz.html +# - https://www.elastic.co/guide/en/elasticsearch/reference/current/system-config.html +FROM ann-benchmarks +ARG ELASTICSEARCH_VERSION=8.7.0 -# Install elasticsearch. ENV DEBIAN_FRONTEND noninteractive -RUN apt install -y wget curl htop -RUN wget --quiet https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.9.2-amd64.deb \ - && dpkg -i elasticsearch-7.9.2-amd64.deb \ - && rm elasticsearch-7.9.2-amd64.deb +RUN apt-get install -y curl + +WORKDIR /usr/share/elasticsearch -# Install python client. -RUN python3 -m pip install --upgrade elasticsearch==7.9.1 +# Create elasticsearch user and user group. +RUN groupadd -g 1000 elasticsearch +RUN adduser --uid 1000 --gid 1000 --home /usr/share/elasticsearch elasticsearch -# Configure elasticsearch and JVM for single-node, single-core. +COPY --from=builder --chown=elasticsearch:elasticsearch /usr/share/elasticsearch /usr/share/elasticsearch + +RUN echo "vm.max_map_count=262144" >> /etc/sysctl.conf + +# Backup original configurations for potential future reference. +RUN cp config/elasticsearch.yml config/elasticsearch.yml.bak +RUN cp config/jvm.options config/jvm.options.bak + +# Configure Elasticsearch for single-node, single-core. RUN echo '\ discovery.type: single-node\n\ -network.host: 0.0.0.0\n\ -node.master: true\n\ -node.data: true\n\ +node.roles: [master, data]\n\ node.processors: 1\n\ +path.data: /usr/share/elasticsearch/data\n\ +path.logs: /usr/share/elasticsearch/logs\n\ +bootstrap.memory_lock: true\n\ thread_pool.write.size: 1\n\ thread_pool.search.size: 1\n\ thread_pool.search.queue_size: 1\n\ -path.data: /var/lib/elasticsearch\n\ -path.logs: /var/log/elasticsearch\n\ -' > /etc/elasticsearch/elasticsearch.yml +xpack.security.enabled: false\n\ +' > config/elasticsearch.yml RUN echo '\ -Xms3G\n\ -Xmx3G\n\ -XX:+UseG1GC\n\ --XX:G1ReservePercent=25\n\ --XX:InitiatingHeapOccupancyPercent=30\n\ --XX:+HeapDumpOnOutOfMemoryError\n\ --XX:HeapDumpPath=/var/lib/elasticsearch\n\ --XX:ErrorFile=/var/log/elasticsearch/hs_err_pid%p.log\n\ --Xlog:gc*,gc+age=trace,safepoint:file=/var/log/elasticsearch/gc.log:utctime,pid,tags:filecount=32,filesize=64m' > /etc/elasticsearch/jvm.options +-XX:HeapDumpPath=data\n\ +-XX:ErrorFile=/usr/share/elasticsearch/logs/hs_err_pid%p.log\n\ +-Xlog:gc*,gc+age=trace,safepoint:file=/usr/share/elasticsearch/logs/gc.log:utctime,pid,tags:filecount=32,filesize=64m\n\ +' > config/jvm.options -# Make sure you can start the service. -RUN service elasticsearch start && service elasticsearch stop +RUN chown -R elasticsearch:elasticsearch /usr/share/elasticsearch + +WORKDIR /home/app + +RUN python3 -m pip install elasticsearch==${ELASTICSEARCH_VERSION} # Custom entrypoint that also starts the Elasticsearch server. -RUN echo 'service elasticsearch start && python3 -u run_algorithm.py "$@"' > entrypoint.sh +RUN echo 'set -eux' >> entrypoint.sh +RUN echo 'su - elasticsearch -c "nohup /usr/share/elasticsearch/bin/elasticsearch > nohup.out 2>&1 &"' >> entrypoint.sh +RUN echo 'python3 -u run_algorithm.py "$@"' >> entrypoint.sh + ENTRYPOINT ["/bin/bash", "/home/app/entrypoint.sh"]