From 0cc7a5a6a6cb673abb354aca0bae261009535015 Mon Sep 17 00:00:00 2001 From: shiyu22 Date: Wed, 14 Jun 2023 11:45:25 +0800 Subject: [PATCH] Add osschat-milvus --- README.md | 78 +++++++++++++++++++++++++++++++++++++- __init__.py | 4 ++ es_index.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 __init__.py create mode 100644 es_index.py diff --git a/README.md b/README.md index 2c42d7a..0553c9b 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,78 @@ -# osschat-index +# ElasticSearch Index +*author: Jael* + +
+ +## Description + +The index operator index the given documents in ElasticSearch to get ready for retrieval. +It accepts a single document in dictionary or a list of documents (dictionaries) as input. +For each document, the index automatically generates a unique id. +To use this operator, you need to [set up ElasticSearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html) in advance. + +
+ +## Code Example + +Insert an example document into ElasticSearch with address of localhost:9200 and index of 'test_index'. + +```python +from datetime import datetime +from towhee import pipe, ops, DataCollection + + +example_doc = { + 'title': 'Test Title', + 'author': 'Towhee', + 'content': 'This is an example.', + 'timestamp': datetime.now() + } + +es_insert = ( + pipe.input('index_name', 'doc') + .map(('index_name', 'doc'), 'res', ops.elasticsearch.index_client( + host='localhost', port=9200 + )) + .output('doc', 'res') +) + +res = es_insert('test_index', example_doc) # OR: es_insert('test_index', [example_doc]) +DataCollection(res).show() # Optional: display output data +``` + +
+ +## Factory Constructor + +Create the operator via the following factory method: + +***elasticsearch.search(host='localhost', port=9200, user=None, password=None, ca_certs=None)*** + +**Parameters:** + +***host***: *str* + +The host to connect ElasticSearch client. + +***port***: *int* + +The port to connect ElasticSearch client. + +***user***: *str* + +The username to connect ElasticSearch client if needed, defaults to None. + +***password***: *str* + +The user password to connect ElasticSearch client if needed, defaults to None. + +***ca_certs***: *str* + +The path to CA certificates to connect ElasticSearch client if needed, defaults to None. + +
+ +**Returns:** + +Index response wrapped by `elastic_transport.ObjectApiResponse`. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..9d33678 --- /dev/null +++ b/__init__.py @@ -0,0 +1,4 @@ +from .es_index import ESIndex + +def osschat_index(*args, **kwargs): + return ESIndex(*args, **kwargs) \ No newline at end of file diff --git a/es_index.py b/es_index.py new file mode 100644 index 0000000..c7128f4 --- /dev/null +++ b/es_index.py @@ -0,0 +1,107 @@ +import logging +from typing import Union, List + +from elasticsearch import Elasticsearch +import elasticsearch.helpers # type: ignore + +from towhee.operator import PyOperator, SharedType # type: ignore + + +logger = logging.getLogger() + + +class ESIndex(PyOperator): + """ + Use bulk to insert docs into ElasticSearch index, using auto id generated. + + Args: + host (`str`): host to connect ElasticSearch client + port (`int`): port to connect ElasticSearch client + user (`str=None`): user name to connect ElasticSearch client, defaults to None + password (`str=None`): user password to connect ElasticSearch client, defaults to None + ca_certs (`str=None`): path to CA certificate, defaults to None + """ + def __init__(self, host: str, port: int, user: str = None, password: str = None, ca_certs: str = None): + super().__init__() + try: + self.client = Elasticsearch( + f'https://{host}:{port}', + ca_certs=ca_certs, + basic_auth=(user, password)) + logger.info('Successfully connected to ElasticSearch client.') + except Exception as e: + logger.error('Failed to connect ElasticSearch client:\n', e) + raise e + + + def __call__(self, index_name: str, doc: Union[dict, List[dict]]): + # if index not exist, create with stop words analyzer to strengthen the search accuracy + if not self.is_index_exist(index_name): + logger.info(f'index{index_name} not exists, will create the index with stopwords analyzer') + self.create_index_with_stopwords(index_name) + + if isinstance(doc, dict): + docs = [doc] + else: + docs = doc + + for x in docs: + assert isinstance(x, dict) + + actions = [ + { + '_op_type': 'index', + '_index': index_name, + '_source': docs[i] + } + for i in range(len(docs)) + ] + res = elasticsearch.helpers.bulk(self.client, actions, refresh=True) + return res + + def is_index_exist(self, index_name: str): + return self.client.indices.exists(index=index_name) + + def create_index_with_stopwords(self, index_name: str): + mappings = { + "properties": { + "milvus_id": { + "type": "long" + }, + "paragraph": { + "type": "text", + "analyzer": "my_stop_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + }, + "path": { + "type": "text", + "analyzer": "my_stop_analyzer", + "fields": { + "keyword": { + "type": "keyword", + "ignore_above": 256 + } + } + } + } + } + settings = { + "analysis": { + "analyzer": { + "my_stop_analyzer": { + "type": "stop", + "stopwords_path": "stopwords/stopwords-en.txt" + } + } + }, + "number_of_shards": 3, + "number_of_replicas": 0 + } + self.client.indices.create(index=index_name, mappings=mappings, settings=settings) + logger.info(f"created index{index_name}") +