From 7b90c675c0ca1c943a744f9e6bb1e646dd8c5ab4 Mon Sep 17 00:00:00 2001 From: MrPresent-Han Date: Mon, 6 Mar 2023 20:02:29 +0800 Subject: [PATCH] support es stopwords analyzer inside indexClient provide ddl with stopanalyzer Signed-off-by: MrPresent-Han --- es_index.py | 54 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/es_index.py b/es_index.py index 58f2ed2..8cd9db6 100644 --- a/es_index.py +++ b/es_index.py @@ -34,6 +34,11 @@ class ESIndex(PyOperator): except Exception as e: logger.error('Failed to connect ElasticSearch client:\n', e) raise e + # if index not exist, create with stop words analyzer to strengthen the search accuracy + if not self.is_index_exist(index_name): + logger.info(f'index{index_name} not exists, will create the index with stopwords analyzer') + self.create_index_with_stopwords(index_name) + def __call__(self, doc: Union[dict, List[dict]]): if isinstance(doc, dict): @@ -43,7 +48,7 @@ class ESIndex(PyOperator): for x in docs: assert isinstance(x, dict) - + actions = [ { '_op_type': 'index', @@ -54,3 +59,50 @@ class ESIndex(PyOperator): ] res = elasticsearch.helpers.bulk(self.client, actions, refresh=True) return res + + def is_index_exist(self, index_name: str): + return self.client.indices.exists(index=index_name) + + def create_index_with_stopwords(self, index_name: str): + mappings = { + "properties": { + "milvus_id": { + "type": "long" + }, + "paragraph": { + "type": "text", + "analyzer": "my_stop_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "path": { + "type": "text", + "analyzer": "my_stop_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + settings = { + "analysis": { + "analyzer": { + "my_stop_analyzer": { + "type": "stop", + "stopwords_path": "stopwords/stopwords-en.txt" + } + } + }, + "number_of_shards": 3, + "number_of_replicas": 0 + } + self.client.indices.create(index=index_name, mappings=mappings, settings=settings) + logger.info(f"created index{index_name}") +