From 9707e17f3d15c9626f4f2e58646baf48a3301434 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Mon, 11 Jul 2022 16:57:28 +0300 Subject: [PATCH 01/29] first changes --- algos.yaml | 46 ++++++++++++++---- ann_benchmarks/algorithms/elasticsearch.py | 56 +++++++++++----------- 2 files changed, 63 insertions(+), 39 deletions(-) diff --git a/algos.yaml b/algos.yaml index 231f08667..e3e294183 100644 --- a/algos.yaml +++ b/algos.yaml @@ -672,14 +672,6 @@ float: - {"n_neighbors": 60, "diversify_prob": 0.0, "pruning_degree_multiplier":[2.0, 3.0], "leaf_size": 48} query-args: [[0.0, 0.04, 0.08, 0.12, 0.16, 0.20, 0.24, 0.28, 0.32, 0.36]] - elasticsearch: - docker-tag: ann-benchmarks-elasticsearch - module: ann_benchmarks.algorithms.elasticsearch - constructor: ElasticsearchScriptScoreQuery - base-args: [ "@metric", "@dimension" ] - run-groups: - empty: - args: [] elastiknn-l2lsh: docker-tag: ann-benchmarks-elastiknn module: ann_benchmarks.algorithms.elastiknn @@ -976,8 +968,42 @@ float: constructor: ElasticsearchScriptScoreQuery base-args: [ "@metric", "@dimension" ] run-groups: - empty: - args: [] + M-4: + arg-groups: + - {"m": 4, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-8: + arg-groups: + - {"m": 8, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-12: + arg-groups: + - {"m": 12, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-16: + arg-groups: + - {"m": 16, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-24: + arg-groups: + - {"m": 24, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-36: + arg-groups: + - {"m": 36, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-48: + arg-groups: + - {"m": 48, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-64: + arg-groups: + - {"m": 64, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] + M-96: + arg-groups: + - {"m": 96, "ef_construction": 500, "type": "hnsw"} + query-args: [[10, 20, 40, 80, 120, 200, 400, 600, 800]] opensearchknn: docker-tag: ann-benchmarks-opensearchknn module: ann_benchmarks.algorithms.opensearchknn diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 95eccab09..dcb1de015 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -42,31 +42,36 @@ class ElasticsearchScriptScoreQuery(BaseANN): - Dense vector queries: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-script-score-query.html """ - def __init__(self, metric: str, dimension: int): - self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}" - self.metric = metric + def __init__(self, metric: str, dimension: int, method_param): + self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}_params{method_param}" + self.metric = {"euclidean": 'l2_norm', "angular": 'cosine'}[metric] + self.method_param = method_param self.dimension = dimension self.index = f"es-ssq-{metric}-{dimension}" self.es = Elasticsearch(["http://localhost:9200"]) self.batch_res = [] - if self.metric == "euclidean": - self.script = "1 / (1 + l2norm(params.query_vec, \"vec\"))" - elif self.metric == "angular": - self.script = "1.0 + cosineSimilarity(params.query_vec, \"vec\")" - else: - raise NotImplementedError(f"Not implemented for metric {self.metric}") es_wait() def fit(self, X): - body = dict(settings=dict(number_of_shards=1, number_of_replicas=0)) - mapping = dict( + mappings = dict( properties=dict( id=dict(type="keyword", store=True), - vec=dict(type="dense_vector", dims=self.dimension) + vec=dict( + type="dense_vector", + dims=self.dimension, + similarity=self.metric, + index=True, + index_options=self.method_param + ) ) ) - self.es.indices.create(self.index, body=body) - self.es.indices.put_mapping(mapping, self.index) + if self.es.indices.exists(index=self.index): + print('deleteing...', end=' ') + self.es.indices.delete(index=self.index) + print('done!') + self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0)) + + # self.es.indices.put_mapping(properties=properties, index=self.index) def gen(): for i, vec in enumerate(X): @@ -75,23 +80,16 @@ def gen(): (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) assert len(errors) == 0, errors - self.es.indices.refresh(self.index) - self.es.indices.forcemerge(self.index, max_num_segments=1) + self.es.indices.refresh(index=self.index) + self.es.indices.forcemerge(index=self.index, max_num_segments=1) + + def set_query_arguments(self, ef): + self.ef = ef def query(self, q, n): - body = dict( - query=dict( - script_score=dict( - query=dict(match_all=dict()), - script=dict( - source=self.script, - params=dict(query_vec=q.tolist()) - ) - ) - ) - ) - res = self.es.search(index=self.index, body=body, size=n, _source=False, docvalue_fields=['id'], - stored_fields="_none_", filter_path=["hits.hits.fields.id"]) + knn = dict(field='vec', query_vector=q.tolist(), k=n, num_candidates=self.ef) + res = self.es.knn_search(index=self.index, knn=knn, source=False, docvalue_fields=['id'], + stored_fields="_none_", filter_path=["hits.hits.fields.id"]) return [int(h['fields']['id'][0]) - 1 for h in res['hits']['hits']] def batch_query(self, X, n): From 7ffe4e62af9c81bb61cda39bce7ba503eea1f859 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Tue, 12 Jul 2022 14:02:23 +0300 Subject: [PATCH 02/29] more work --- algos.yaml | 2 +- ann_benchmarks/algorithms/elasticsearch.py | 47 ++++++++++++++++------ 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/algos.yaml b/algos.yaml index e3e294183..942a3a321 100644 --- a/algos.yaml +++ b/algos.yaml @@ -966,7 +966,7 @@ float: docker-tag: ann-benchmarks-elasticsearch module: ann_benchmarks.algorithms.elasticsearch constructor: ElasticsearchScriptScoreQuery - base-args: [ "@metric", "@dimension" ] + base-args: [ "@metric", "@dimension", "@connection" ] run-groups: M-4: arg-groups: diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index dcb1de015..a1d558a49 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -7,7 +7,7 @@ from urllib.error import URLError from urllib.request import Request, urlopen -from elasticsearch import Elasticsearch +from elasticsearch import Elasticsearch, ConnectionTimeout, BadRequestError from elasticsearch.helpers import bulk from ann_benchmarks.algorithms.base import BaseANN @@ -20,9 +20,9 @@ # logging.basicConfig(level=logging.INFO) # logging.getLogger("elasticsearch").setLevel(logging.INFO) -def es_wait(): +def es_wait(baseurl): print("Waiting for elasticsearch health endpoint...") - req = Request("http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=1s") + req = Request(f"{baseurl}/_cluster/health?wait_for_status=yellow&timeout=1s") for i in range(30): try: res = urlopen(req) @@ -42,17 +42,31 @@ class ElasticsearchScriptScoreQuery(BaseANN): - Dense vector queries: https://www.elastic.co/guide/en/elasticsearch/reference/master/query-dsl-script-score-query.html """ - def __init__(self, metric: str, dimension: int, method_param): + def __init__(self, metric: str, dimension: int, conn_params, method_param): self.name = f"elasticsearch-script-score-query_metric={metric}_dimension={dimension}_params{method_param}" self.metric = {"euclidean": 'l2_norm', "angular": 'cosine'}[metric] self.method_param = method_param self.dimension = dimension + self.timeout = 60 * 60 + h = conn_params['host'] if conn_params['host'] is not None else 'localhost' + p = conn_params['port'] if conn_params['port'] is not None else '9200' + self.url = f"http://{h}:{p}" self.index = f"es-ssq-{metric}-{dimension}" - self.es = Elasticsearch(["http://localhost:9200"]) + self.es = Elasticsearch([self.url]) self.batch_res = [] - es_wait() + es_wait(self.url) def fit(self, X): + def wait_for_readiness(): + ready = False + for i in range(self.timeout): + stats = self.es.indices.stats(index=self.index) + if stats['_shards']['total'] == stats['_shards']['successful']: + ready = True + break + sleep(1) + return ready + mappings = dict( properties=dict( id=dict(type="keyword", store=True), @@ -65,11 +79,13 @@ def fit(self, X): ) ) ) - if self.es.indices.exists(index=self.index): - print('deleteing...', end=' ') - self.es.indices.delete(index=self.index) - print('done!') - self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0)) + try: + self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0), timeout=f'{self.timeout}m') + except ConnectionTimeout as e: + if not wait_for_readiness(): + raise e + except BadRequestError as e: + if 'resource_already_exists_exception' not in e.message: raise e # self.es.indices.put_mapping(properties=properties, index=self.index) @@ -80,8 +96,13 @@ def gen(): (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) assert len(errors) == 0, errors - self.es.indices.refresh(index=self.index) - self.es.indices.forcemerge(index=self.index, max_num_segments=1) + try: + self.es.indices.refresh(index=self.index) + except ConnectionTimeout as e: + if not wait_for_readiness(): + raise e + self.es.indices.refresh(index=self.index) + # self.es.indices.forcemerge(index=self.index, max_num_segments=1) def set_query_arguments(self, ef): self.ef = ef From a0df4561ab369d675d197f78beb58352aa3abc8e Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 17 Jul 2022 16:29:23 +0300 Subject: [PATCH 03/29] make it work --- ann_benchmarks/algorithms/elasticsearch.py | 10 +++++----- multirun.py | 17 ++++++++++++++--- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index a1d558a49..5a54cce2c 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -51,8 +51,8 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): h = conn_params['host'] if conn_params['host'] is not None else 'localhost' p = conn_params['port'] if conn_params['port'] is not None else '9200' self.url = f"http://{h}:{p}" - self.index = f"es-ssq-{metric}-{dimension}" - self.es = Elasticsearch([self.url]) + self.index = "ann_benchmark" + self.es = Elasticsearch([self.url], request_timeout=self.timeout) self.batch_res = [] es_wait(self.url) @@ -91,7 +91,7 @@ def wait_for_readiness(): def gen(): for i, vec in enumerate(X): - yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(i + 1) } + yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(i) } (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) assert len(errors) == 0, errors @@ -102,7 +102,7 @@ def gen(): if not wait_for_readiness(): raise e self.es.indices.refresh(index=self.index) - # self.es.indices.forcemerge(index=self.index, max_num_segments=1) + self.es.indices.forcemerge(index=self.index, max_num_segments=1) def set_query_arguments(self, ef): self.ef = ef @@ -111,7 +111,7 @@ def query(self, q, n): knn = dict(field='vec', query_vector=q.tolist(), k=n, num_candidates=self.ef) res = self.es.knn_search(index=self.index, knn=knn, source=False, docvalue_fields=['id'], stored_fields="_none_", filter_path=["hits.hits.fields.id"]) - return [int(h['fields']['id'][0]) - 1 for h in res['hits']['hits']] + return [int(h['fields']['id'][0]) for h in res['hits']['hits']] def batch_query(self, X, n): self.batch_res = [self.query(q, n) for q in X] diff --git a/multirun.py b/multirun.py index b81e46b36..7c7c67a48 100644 --- a/multirun.py +++ b/multirun.py @@ -18,6 +18,8 @@ import pinecone +from elasticsearch import Elasticsearch + def aggregate_outputs(files, clients): different_attrs = set([f.split('client')[0] for f in files]) @@ -155,15 +157,17 @@ def aggregate_outputs(files, clients): print("Changing the workdir to {}".format(workdir)) os.chdir(workdir) - isredis = True if 'redisearch' in args.algorithm else False - ismilvus = True if 'milvus' in args.algorithm else False - ispinecone = True if 'pinecone' in args.algorithm else False + isredis = 'redisearch' in args.algorithm + ismilvus = 'milvus' in args.algorithm + ispinecone = 'pinecone' in args.algorithm + iselastic = 'elasticsearch' in args.algorithm if args.host is None: args.host = 'localhost' if args.port is None: if isredis: args.port = '6379' elif ismilvus: args.port = '19530' + elif iselastic: args.port = '9200' if isredis: redis = RedisCluster if args.cluster else Redis @@ -172,6 +176,8 @@ def aggregate_outputs(files, clients): connections.connect(host=args.host, port=args.port) elif ispinecone: pinecone.init(api_key=args.auth) + elif iselastic: + es = Elasticsearch([f'http://{args.host}:{args.port}'], request_timeout=3600) if args.run_group is not None: run_groups = [args.run_group] @@ -221,6 +227,9 @@ def on_created_or_modified(event): elif ispinecone: for idx in pinecone.list_indexes(): pinecone.delete_index(idx) + elif iselastic: + for idx in es.indices.stats()['indices']: + es.indices.delete(index=idx) results_dict = {} curr_base_build = base_build + ' --run-group ' + run_group @@ -245,6 +254,8 @@ def on_created_or_modified(event): if not args.cluster: # TODO: get total size from all the shards index_size = float(redis.ft('ann_benchmark').info()['vector_index_sz_mb']) * 1024 f.attrs["index_size"] = index_size + elif iselastic: + f.attrs["index_size"] = es.indices.stats(index='ann_benchmark')['indices']['ann_benchmark']['total']['store']['size_in_bytes'] f.close() results_dict["build"] = {"total_clients": args.build_clients, "build_time": total_time, "vector_index_sz_mb": index_size} From 4140e37a2eff9b740f00dc553f05c863c9cee26e Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 17 Jul 2022 19:07:43 +0300 Subject: [PATCH 04/29] added certification support --- ann_benchmarks/algorithms/elasticsearch.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 5a54cce2c..2dc0b7609 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -4,11 +4,13 @@ """ import logging from time import sleep +from os import environ from urllib.error import URLError from urllib.request import Request, urlopen from elasticsearch import Elasticsearch, ConnectionTimeout, BadRequestError from elasticsearch.helpers import bulk +from elastic_transport.client_utils import DEFAULT from ann_benchmarks.algorithms.base import BaseANN @@ -20,19 +22,18 @@ # logging.basicConfig(level=logging.INFO) # logging.getLogger("elasticsearch").setLevel(logging.INFO) -def es_wait(baseurl): +def es_wait(es): print("Waiting for elasticsearch health endpoint...") - req = Request(f"{baseurl}/_cluster/health?wait_for_status=yellow&timeout=1s") for i in range(30): try: - res = urlopen(req) - if res.getcode() == 200: + res = es.cluster.health(wait_for_status='yellow', timeout='1s') + if not res['timed_out']: # then status is OK print("Elasticsearch is ready") return except URLError: pass sleep(1) - raise RuntimeError("Failed to connect to local elasticsearch") + raise RuntimeError("Failed to connect to elasticsearch server") class ElasticsearchScriptScoreQuery(BaseANN): @@ -50,11 +51,11 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): self.timeout = 60 * 60 h = conn_params['host'] if conn_params['host'] is not None else 'localhost' p = conn_params['port'] if conn_params['port'] is not None else '9200' - self.url = f"http://{h}:{p}" + self.url = f"https://{h}:{p}" self.index = "ann_benchmark" - self.es = Elasticsearch([self.url], request_timeout=self.timeout) + self.es = Elasticsearch(self.url, request_timeout=self.timeout, basic_auth=(conn_params['user'], conn_params['auth']), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) self.batch_res = [] - es_wait(self.url) + es_wait(self.es) def fit(self, X): def wait_for_readiness(): From 7bda800f24d3e2e656ac4c26c59fda3a6c38e5bf Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 17 Jul 2022 19:08:05 +0300 Subject: [PATCH 05/29] improved importing --- multirun.py | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/multirun.py b/multirun.py index 7c7c67a48..a51725ba9 100644 --- a/multirun.py +++ b/multirun.py @@ -11,15 +11,6 @@ from ann_benchmarks.results import get_result_filename from ann_benchmarks.algorithms.definitions import get_run_groups -from redis import Redis -from redis.cluster import RedisCluster - -from pymilvus import utility, connections - -import pinecone - -from elasticsearch import Elasticsearch - def aggregate_outputs(files, clients): different_attrs = set([f.split('client')[0] for f in files]) @@ -157,10 +148,26 @@ def aggregate_outputs(files, clients): print("Changing the workdir to {}".format(workdir)) os.chdir(workdir) - isredis = 'redisearch' in args.algorithm - ismilvus = 'milvus' in args.algorithm - ispinecone = 'pinecone' in args.algorithm - iselastic = 'elasticsearch' in args.algorithm + # All supported algorithms that need spacial stuff + isredis = ismilvus = ispinecone = iselastic = False + + if 'redisearch' in args.algorithm: + from redis import Redis + from redis.cluster import RedisCluster + isredis = True + + elif 'milvus' in args.algorithm: + from pymilvus import utility, connections + ismilvus = True + + elif 'pinecone' in args.algorithm: + import pinecone + ispinecone = True + + elif 'elasticsearch' in args.algorithm: + from elasticsearch import Elasticsearch + from elastic_transport.client_utils import DEFAULT + iselastic = True if args.host is None: args.host = 'localhost' @@ -177,7 +184,9 @@ def aggregate_outputs(files, clients): elif ispinecone: pinecone.init(api_key=args.auth) elif iselastic: - es = Elasticsearch([f'http://{args.host}:{args.port}'], request_timeout=3600) + args.user = args.user if args.user is not None else 'elastic' + args.auth = args.auth if args.auth is not None else os.environ.get('ELASTIC_PASSWORD', '') + es = Elasticsearch([f'https://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) if args.run_group is not None: run_groups = [args.run_group] From d5dddc8582557c64fbba6a48959e437658f05bf8 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 17 Jul 2022 19:46:33 +0300 Subject: [PATCH 06/29] supporting both secure and not secure settings --- ann_benchmarks/algorithms/elasticsearch.py | 8 ++++++-- multirun.py | 5 ++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 2dc0b7609..e7c4ab42a 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -51,9 +51,13 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): self.timeout = 60 * 60 h = conn_params['host'] if conn_params['host'] is not None else 'localhost' p = conn_params['port'] if conn_params['port'] is not None else '9200' - self.url = f"https://{h}:{p}" + u = conn_params['user'] if conn_params['user'] is not None else 'elastic' + a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" - self.es = Elasticsearch(self.url, request_timeout=self.timeout, basic_auth=(conn_params['user'], conn_params['auth']), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) + try: + self.es = Elasticsearch(f"http://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) + except Exception: + self.es = Elasticsearch(f"https://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) self.batch_res = [] es_wait(self.es) diff --git a/multirun.py b/multirun.py index a51725ba9..b9dff1a45 100644 --- a/multirun.py +++ b/multirun.py @@ -186,7 +186,10 @@ def aggregate_outputs(files, clients): elif iselastic: args.user = args.user if args.user is not None else 'elastic' args.auth = args.auth if args.auth is not None else os.environ.get('ELASTIC_PASSWORD', '') - es = Elasticsearch([f'https://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) + try: + es = Elasticsearch([f'http://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) + except Exception: + es = Elasticsearch([f'https://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) if args.run_group is not None: run_groups = [args.run_group] From e40f17c2b5c0862859b8076edbbcedc20af624c7 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Tue, 19 Jul 2022 10:55:07 +0300 Subject: [PATCH 07/29] some cleanup --- ann_benchmarks/algorithms/elasticsearch.py | 30 ++++------------------ multirun.py | 3 ++- requirements.txt | 1 + 3 files changed, 8 insertions(+), 26 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index e7c4ab42a..3a9b4d46e 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -6,9 +6,8 @@ from time import sleep from os import environ from urllib.error import URLError -from urllib.request import Request, urlopen -from elasticsearch import Elasticsearch, ConnectionTimeout, BadRequestError +from elasticsearch import Elasticsearch, BadRequestError from elasticsearch.helpers import bulk from elastic_transport.client_utils import DEFAULT @@ -55,23 +54,14 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" try: - self.es = Elasticsearch(f"http://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) + self.es = Elasticsearch(f"http://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), refresh_interval=-1) + self.es.info() except Exception: self.es = Elasticsearch(f"https://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) self.batch_res = [] es_wait(self.es) def fit(self, X): - def wait_for_readiness(): - ready = False - for i in range(self.timeout): - stats = self.es.indices.stats(index=self.index) - if stats['_shards']['total'] == stats['_shards']['successful']: - ready = True - break - sleep(1) - return ready - mappings = dict( properties=dict( id=dict(type="keyword", store=True), @@ -85,15 +75,10 @@ def wait_for_readiness(): ) ) try: - self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0), timeout=f'{self.timeout}m') - except ConnectionTimeout as e: - if not wait_for_readiness(): - raise e + self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0)) except BadRequestError as e: if 'resource_already_exists_exception' not in e.message: raise e - # self.es.indices.put_mapping(properties=properties, index=self.index) - def gen(): for i, vec in enumerate(X): yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(i) } @@ -101,12 +86,7 @@ def gen(): (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) assert len(errors) == 0, errors - try: - self.es.indices.refresh(index=self.index) - except ConnectionTimeout as e: - if not wait_for_readiness(): - raise e - self.es.indices.refresh(index=self.index) + self.es.indices.refresh(index=self.index) self.es.indices.forcemerge(index=self.index, max_num_segments=1) def set_query_arguments(self, ef): diff --git a/multirun.py b/multirun.py index b9dff1a45..41d19ec13 100644 --- a/multirun.py +++ b/multirun.py @@ -187,7 +187,8 @@ def aggregate_outputs(files, clients): args.user = args.user if args.user is not None else 'elastic' args.auth = args.auth if args.auth is not None else os.environ.get('ELASTIC_PASSWORD', '') try: - es = Elasticsearch([f'http://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) + es = Elasticsearch([f'http://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth)) + es.info() except Exception: es = Elasticsearch([f'https://{args.host}:{args.port}'], request_timeout=3600, basic_auth=(args.user, args.auth), ca_certs=os.environ.get('ELASTIC_CA', DEFAULT)) diff --git a/requirements.txt b/requirements.txt index fc3552900..fbb04bea5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ jinja2==2.10 pymilvus==2.0.2 pinecone-client==2.0.11 redis==4.3.2 +elasticsearch==8.3.1 From e11959270992e83e97e8a07ad15733b7d3bde43e Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Tue, 19 Jul 2022 10:59:36 +0300 Subject: [PATCH 08/29] another requirement update --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index fbb04bea5..2885ef7f9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,7 +7,7 @@ pyyaml==5.4 psutil==5.6.6 scipy==1.3.3 scikit-learn==0.22.2 -jinja2==2.10 +jinja2==3.1.2 pymilvus==2.0.2 pinecone-client==2.0.11 redis==4.3.2 From 118e2d91a5153b109f70d7c453f291c59d628bc8 Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Wed, 20 Jul 2022 10:30:15 +0300 Subject: [PATCH 09/29] improved multirun on existing index --- multirun.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/multirun.py b/multirun.py index 41d19ec13..83faa6b7f 100644 --- a/multirun.py +++ b/multirun.py @@ -232,23 +232,23 @@ def on_created_or_modified(event): observer.start() for run_group in run_groups: - if isredis: - redis.flushall() - elif ismilvus: - if utility.has_collection('milvus'): - utility.drop_collection('milvus') - elif ispinecone: - for idx in pinecone.list_indexes(): - pinecone.delete_index(idx) - elif iselastic: - for idx in es.indices.stats()['indices']: - es.indices.delete(index=idx) - results_dict = {} curr_base_build = base_build + ' --run-group ' + run_group curr_base_test = base_test + ' --run-group ' + run_group if int(args.build_clients) > 0: + if isredis: + redis.flushall() + elif ismilvus: + if utility.has_collection('milvus'): + utility.drop_collection('milvus') + elif ispinecone: + for idx in pinecone.list_indexes(): + pinecone.delete_index(idx) + elif iselastic: + for idx in es.indices.stats()['indices']: + es.indices.delete(index=idx) + clients = [Process(target=os.system, args=(curr_base_build + ' --client-id ' + str(i),)) for i in range(1, int(args.build_clients) + 1)] From fcf83f6729e6033f996767e27e005435ccfc2040 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:08:54 +0100 Subject: [PATCH 10/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 25 +++++++++++++--------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 3a9b4d46e..aebdab98b 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -61,7 +61,9 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): self.batch_res = [] es_wait(self.es) - def fit(self, X): + def fit(self, X, offset=0, limit=None): + limit = limit if limit else len(X) + X = X[offset:limit] mappings = dict( properties=dict( id=dict(type="keyword", store=True), @@ -78,16 +80,19 @@ def fit(self, X): self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0)) except BadRequestError as e: if 'resource_already_exists_exception' not in e.message: raise e - - def gen(): - for i, vec in enumerate(X): - yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(i) } - - (_, errors) = bulk(self.es, gen(), chunk_size=500, max_retries=9) - assert len(errors) == 0, errors - + bulk_size = 500 + for bulk in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: + print(f'inserting vectors {offset} to {len(bulk)}') + offset += len(bulk) + def gen(): + for i, vec in enumerate(X): + yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } + (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) + assert len(errors) == 0, errors + + print('refreshing elastic index...') self.es.indices.refresh(index=self.index) - self.es.indices.forcemerge(index=self.index, max_num_segments=1) + print('finished refreshing elastic index...') def set_query_arguments(self, ef): self.ef = ef From ba70afd319ada5660be92da11554f2e8a8069294 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:14:39 +0100 Subject: [PATCH 11/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index aebdab98b..ddb50bf44 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -64,6 +64,7 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): def fit(self, X, offset=0, limit=None): limit = limit if limit else len(X) X = X[offset:limit] + X = X.tolist() mappings = dict( properties=dict( id=dict(type="keyword", store=True), From de767ff4553a76cc99a99859bc0ed943979c965a Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:33:06 +0100 Subject: [PATCH 12/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index ddb50bf44..fac42e326 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -86,7 +86,7 @@ def fit(self, X, offset=0, limit=None): print(f'inserting vectors {offset} to {len(bulk)}') offset += len(bulk) def gen(): - for i, vec in enumerate(X): + for i, vec in enumerate(bulk): yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) assert len(errors) == 0, errors From 93448a5fa3ccb3575f64798578cc48965a0d95bd Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:36:37 +0100 Subject: [PATCH 13/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index fac42e326..3a31bef8b 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -87,7 +87,7 @@ def fit(self, X, offset=0, limit=None): offset += len(bulk) def gen(): for i, vec in enumerate(bulk): - yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } + yield { "_op_type": "index", "_index": self.index, "vec": vec, 'id': str(offset+i) } (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) assert len(errors) == 0, errors From 48179a907979ec22fbb7b3dd4db82cc50966cf9a Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:43:26 +0100 Subject: [PATCH 14/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 3a31bef8b..caf3e7ba4 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -63,8 +63,6 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): def fit(self, X, offset=0, limit=None): limit = limit if limit else len(X) - X = X[offset:limit] - X = X.tolist() mappings = dict( properties=dict( id=dict(type="keyword", store=True), @@ -85,6 +83,7 @@ def fit(self, X, offset=0, limit=None): for bulk in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: print(f'inserting vectors {offset} to {len(bulk)}') offset += len(bulk) + print(bulk[0]) def gen(): for i, vec in enumerate(bulk): yield { "_op_type": "index", "_index": self.index, "vec": vec, 'id': str(offset+i) } From 5666f807c516e4d60a97ec59fbd051fb79179764 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:44:29 +0100 Subject: [PATCH 15/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index caf3e7ba4..f1f5f1616 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -83,10 +83,10 @@ def fit(self, X, offset=0, limit=None): for bulk in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: print(f'inserting vectors {offset} to {len(bulk)}') offset += len(bulk) - print(bulk[0]) + # print(bulk[0]) def gen(): for i, vec in enumerate(bulk): - yield { "_op_type": "index", "_index": self.index, "vec": vec, 'id': str(offset+i) } + yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) assert len(errors) == 0, errors From c7028a89adc1bb7b4afad4c6a280e955e0766eff Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:46:36 +0100 Subject: [PATCH 16/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index f1f5f1616..a05d9b93f 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -84,10 +84,11 @@ def fit(self, X, offset=0, limit=None): print(f'inserting vectors {offset} to {len(bulk)}') offset += len(bulk) # print(bulk[0]) - def gen(): - for i, vec in enumerate(bulk): - yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } - (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) + operations = [] + # def gen(): + for i, vec in enumerate(bulk): + operations.append( { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) }) + (_, errors) = bulk(self.es, operations, chunk_size=bulk_size, max_retries=9) assert len(errors) == 0, errors print('refreshing elastic index...') From a7e823a822389dfeb0962052bc59c09eaa9777dc Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:50:01 +0100 Subject: [PATCH 17/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index a05d9b93f..66bee0f25 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -80,15 +80,14 @@ def fit(self, X, offset=0, limit=None): except BadRequestError as e: if 'resource_already_exists_exception' not in e.message: raise e bulk_size = 500 - for bulk in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: + for bulk_array in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: print(f'inserting vectors {offset} to {len(bulk)}') offset += len(bulk) - # print(bulk[0]) - operations = [] - # def gen(): - for i, vec in enumerate(bulk): - operations.append( { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) }) - (_, errors) = bulk(self.es, operations, chunk_size=bulk_size, max_retries=9) + + def gen(): + for i, vec in enumerate(bulk_array): + yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } + (_, errors) = bulk(self.es, gen, chunk_size=bulk_size, max_retries=9) assert len(errors) == 0, errors print('refreshing elastic index...') From 50dc722e081f7c2ae8b6526dbf66133014b821b0 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:50:29 +0100 Subject: [PATCH 18/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 66bee0f25..19725d714 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -81,8 +81,8 @@ def fit(self, X, offset=0, limit=None): if 'resource_already_exists_exception' not in e.message: raise e bulk_size = 500 for bulk_array in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: - print(f'inserting vectors {offset} to {len(bulk)}') - offset += len(bulk) + print(f'inserting vectors {offset} to {len(bulk_array)}') + offset += len(bulk_array) def gen(): for i, vec in enumerate(bulk_array): From 5d5a3fea57052271f2039cd62a5421fc43581a76 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:51:00 +0100 Subject: [PATCH 19/29] Improvements over the elastic client v2 --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 19725d714..2641203f8 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -87,7 +87,7 @@ def fit(self, X, offset=0, limit=None): def gen(): for i, vec in enumerate(bulk_array): yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } - (_, errors) = bulk(self.es, gen, chunk_size=bulk_size, max_retries=9) + (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) assert len(errors) == 0, errors print('refreshing elastic index...') From 4c1bfcf4d6d4414d81fa9e1e3a43b2de32f4a300 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 16:58:52 +0100 Subject: [PATCH 20/29] Updating elastic client: WIP --- ann_benchmarks/algorithms/elasticsearch.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 2641203f8..ff0965030 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -109,3 +109,7 @@ def batch_query(self, X, n): def get_batch_results(self): return self.batch_res + def freeIndex(self): + print("Deleting elastic index named {}".format(self.index)) + self.es.indices.delete(index=self.index) + print("Finished deleting elastic index named {}".format(self.index)) From b5216d172fe946f707b09a6bb8767238dce80b64 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 17:32:29 +0100 Subject: [PATCH 21/29] Updating elastic client: WIP --- ann_benchmarks/algorithms/elasticsearch.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index ff0965030..25f6ab069 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -53,11 +53,8 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): u = conn_params['user'] if conn_params['user'] is not None else 'elastic' a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" - try: - self.es = Elasticsearch(f"http://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), refresh_interval=-1) - self.es.info() - except Exception: - self.es = Elasticsearch(f"https://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), ca_certs=environ.get('ELASTIC_CA', DEFAULT)) + self.es = Elasticsearch("{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a)) + self.es.info() self.batch_res = [] es_wait(self.es) From 3f63b05e3f1f0437c30a66466748741adf278c80 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 17:34:11 +0100 Subject: [PATCH 22/29] Updating elastic client: WIP --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 25f6ab069..e093b74a1 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -53,7 +53,7 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): u = conn_params['user'] if conn_params['user'] is not None else 'elastic' a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" - self.es = Elasticsearch("{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a)) + self.es = Elasticsearch("{h}:{p}".format(h,p), request_timeout=self.timeout, basic_auth=(u, a)) self.es.info() self.batch_res = [] es_wait(self.es) From 887b7ac373bf78e67c45ea0a123efd1869f648e0 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 17:34:28 +0100 Subject: [PATCH 23/29] Updating elastic client: WIP --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index e093b74a1..cbc769738 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -53,7 +53,7 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): u = conn_params['user'] if conn_params['user'] is not None else 'elastic' a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" - self.es = Elasticsearch("{h}:{p}".format(h,p), request_timeout=self.timeout, basic_auth=(u, a)) + self.es = Elasticsearch("{}:{}".format(h,p), request_timeout=self.timeout, basic_auth=(u, a)) self.es.info() self.batch_res = [] es_wait(self.es) From 7dfbfd88efb7e77521dd903f2bb2ae0ff3a2a5ee Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 20:07:55 +0100 Subject: [PATCH 24/29] Updating elastic client: WIP --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index cbc769738..ce8dbba0f 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -78,7 +78,7 @@ def fit(self, X, offset=0, limit=None): if 'resource_already_exists_exception' not in e.message: raise e bulk_size = 500 for bulk_array in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: - print(f'inserting vectors {offset} to {len(bulk_array)}') + print(f'inserting vectors {offset} to {offset+len(bulk_array)}') offset += len(bulk_array) def gen(): From 0907a82d03faf7555f3fec8210273b02af652160 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 21 Jul 2022 22:03:39 +0100 Subject: [PATCH 25/29] Updating elastic client: WIP --- ann_benchmarks/algorithms/elasticsearch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index ce8dbba0f..2b01e27e5 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -53,7 +53,7 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): u = conn_params['user'] if conn_params['user'] is not None else 'elastic' a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" - self.es = Elasticsearch("{}:{}".format(h,p), request_timeout=self.timeout, basic_auth=(u, a)) + self.es = Elasticsearch("{}:{}".format(h,p), request_timeout=self.timeout, basic_auth=(u, a), verify_certs=False) self.es.info() self.batch_res = [] es_wait(self.es) From 72e485076abf60befe9c71de92b42188afe5118d Mon Sep 17 00:00:00 2001 From: GuyAv46 Date: Sun, 24 Jul 2022 10:17:13 +0300 Subject: [PATCH 26/29] added shard settings --- ann_benchmarks/algorithms/elasticsearch.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 3a9b4d46e..989cf4aa8 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -53,6 +53,7 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): u = conn_params['user'] if conn_params['user'] is not None else 'elastic' a = conn_params['auth'] if conn_params['auth'] is not None else '' self.index = "ann_benchmark" + self.shards = conn_params['shards'] try: self.es = Elasticsearch(f"http://{h}:{p}", request_timeout=self.timeout, basic_auth=(u, a), refresh_interval=-1) self.es.info() @@ -75,7 +76,7 @@ def fit(self, X): ) ) try: - self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=1, number_of_replicas=0)) + self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=self.shards, number_of_replicas=0)) except BadRequestError as e: if 'resource_already_exists_exception' not in e.message: raise e From 6c4933ddee51815d369d5477b38b27a3befdbcc0 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Wed, 27 Jul 2022 23:06:42 +0100 Subject: [PATCH 27/29] using wait_for on bulk index operation --- ann_benchmarks/algorithms/elasticsearch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 2b01e27e5..dd0a4459d 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -25,7 +25,7 @@ def es_wait(es): print("Waiting for elasticsearch health endpoint...") for i in range(30): try: - res = es.cluster.health(wait_for_status='yellow', timeout='1s') + res = es.cluster.health(wait_for_status='green', timeout='1s') if not res['timed_out']: # then status is OK print("Elasticsearch is ready") return @@ -84,7 +84,7 @@ def fit(self, X, offset=0, limit=None): def gen(): for i, vec in enumerate(bulk_array): yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } - (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9) + (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9, refresh="wait_for") assert len(errors) == 0, errors print('refreshing elastic index...') From bc531eb480a024c1b74ce46c8b494530aac6ade7 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 28 Jul 2022 11:35:40 +0100 Subject: [PATCH 28/29] Fixed last bulk index operation ids --- ann_benchmarks/algorithms/elasticsearch.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index e5614041e..972afdda1 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -3,6 +3,7 @@ Note that this requires X-Pack, which is not included in the OSS version of Elasticsearch. """ import logging +import os from time import sleep from os import environ from urllib.error import URLError @@ -56,6 +57,8 @@ def __init__(self, metric: str, dimension: int, conn_params, method_param): self.es = Elasticsearch("{}:{}".format(h,p), request_timeout=self.timeout, basic_auth=(u, a), verify_certs=False) self.es.info() self.shards = conn_params['shards'] + self.replicas = int(os.getenv('ES_REPLICA_COUNT', "0")) + self.bulk_size = int(os.getenv('ES_BULK_SIZE', "500")) self.batch_res = [] es_wait(self.es) @@ -74,19 +77,20 @@ def fit(self, X, offset=0, limit=None): ) ) try: - self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=self.shards, number_of_replicas=0)) + self.es.indices.create(index=self.index, mappings=mappings, settings=dict(number_of_shards=self.shards, number_of_replicas=self.replicas)) except BadRequestError as e: if 'resource_already_exists_exception' not in e.message: raise e - bulk_size = 500 + bulk_size = self.bulk_size + print(f'Using a bulk size of {bulk_size} vectors') for bulk_array in [X[i: i+bulk_size] for i in range(0, len(X), bulk_size)]: - print(f'inserting vectors {offset} to {offset+len(bulk_array)}') - offset += len(bulk_array) - + bulk_array_len = len(bulk_array) + print(f'inserting vectors {offset} to {offset+bulk_array_len}') def gen(): for i, vec in enumerate(bulk_array): yield { "_op_type": "index", "_index": self.index, "vec": vec.tolist(), 'id': str(offset+i) } (_, errors) = bulk(self.es, gen(), chunk_size=bulk_size, max_retries=9, refresh="wait_for") assert len(errors) == 0, errors + offset += bulk_array_len print('refreshing elastic index...') self.es.indices.refresh(index=self.index) From aa1f3ff026590c1d34d9dbfc30bcca81d16b93f0 Mon Sep 17 00:00:00 2001 From: filipecosta90 Date: Thu, 28 Jul 2022 16:38:57 +0100 Subject: [PATCH 29/29] disable urllib InsecureRequestWarning --- ann_benchmarks/algorithms/elasticsearch.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ann_benchmarks/algorithms/elasticsearch.py b/ann_benchmarks/algorithms/elasticsearch.py index 972afdda1..85063e98b 100644 --- a/ann_benchmarks/algorithms/elasticsearch.py +++ b/ann_benchmarks/algorithms/elasticsearch.py @@ -5,19 +5,20 @@ import logging import os from time import sleep -from os import environ from urllib.error import URLError from elasticsearch import Elasticsearch, BadRequestError from elasticsearch.helpers import bulk -from elastic_transport.client_utils import DEFAULT - +import urllib3 from ann_benchmarks.algorithms.base import BaseANN # Configure the elasticsearch logger. # By default, it writes an INFO statement for every request. logging.getLogger("elasticsearch").setLevel(logging.WARN) +# Disable InsecureRequestWarning +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + # Uncomment these lines if you want to see timing for every HTTP request and its duration. # logging.basicConfig(level=logging.INFO) # logging.getLogger("elasticsearch").setLevel(logging.INFO)