|
|
@ -34,6 +34,11 @@ class ESIndex(PyOperator): |
|
|
except Exception as e: |
|
|
except Exception as e: |
|
|
logger.error('Failed to connect ElasticSearch client:\n', e) |
|
|
logger.error('Failed to connect ElasticSearch client:\n', e) |
|
|
raise e |
|
|
raise e |
|
|
|
|
|
# if index not exist, create with stop words analyzer to strengthen the search accuracy |
|
|
|
|
|
if not self.is_index_exist(index_name): |
|
|
|
|
|
logger.info(f'index{index_name} not exists, will create the index with stopwords analyzer') |
|
|
|
|
|
self.create_index_with_stopwords(index_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def __call__(self, doc: Union[dict, List[dict]]): |
|
|
def __call__(self, doc: Union[dict, List[dict]]): |
|
|
if isinstance(doc, dict): |
|
|
if isinstance(doc, dict): |
|
|
@ -54,3 +59,50 @@ class ESIndex(PyOperator): |
|
|
] |
|
|
] |
|
|
res = elasticsearch.helpers.bulk(self.client, actions, refresh=True) |
|
|
res = elasticsearch.helpers.bulk(self.client, actions, refresh=True) |
|
|
return res |
|
|
return res |
|
|
|
|
|
|
|
|
|
|
|
def is_index_exist(self, index_name: str): |
|
|
|
|
|
return self.client.indices.exists(index=index_name) |
|
|
|
|
|
|
|
|
|
|
|
def create_index_with_stopwords(self, index_name: str): |
|
|
|
|
|
mappings = { |
|
|
|
|
|
"properties": { |
|
|
|
|
|
"milvus_id": { |
|
|
|
|
|
"type": "long" |
|
|
|
|
|
}, |
|
|
|
|
|
"paragraph": { |
|
|
|
|
|
"type": "text", |
|
|
|
|
|
"analyzer": "my_stop_analyzer", |
|
|
|
|
|
"fields": { |
|
|
|
|
|
"keyword": { |
|
|
|
|
|
"type": "keyword", |
|
|
|
|
|
"ignore_above": 256 |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
}, |
|
|
|
|
|
"path": { |
|
|
|
|
|
"type": "text", |
|
|
|
|
|
"analyzer": "my_stop_analyzer", |
|
|
|
|
|
"fields": { |
|
|
|
|
|
"keyword": { |
|
|
|
|
|
"type": "keyword", |
|
|
|
|
|
"ignore_above": 256 |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
settings = { |
|
|
|
|
|
"analysis": { |
|
|
|
|
|
"analyzer": { |
|
|
|
|
|
"my_stop_analyzer": { |
|
|
|
|
|
"type": "stop", |
|
|
|
|
|
"stopwords_path": "stopwords/stopwords-en.txt" |
|
|
|
|
|
} |
|
|
|
|
|
} |
|
|
|
|
|
}, |
|
|
|
|
|
"number_of_shards": 3, |
|
|
|
|
|
"number_of_replicas": 0 |
|
|
|
|
|
} |
|
|
|
|
|
self.client.indices.create(index=index_name, mappings=mappings, settings=settings) |
|
|
|
|
|
logger.info(f"created index{index_name}") |
|
|
|
|
|
|
|
|
|