diff --git a/README.md b/README.md
index 2c42d7a..0553c9b 100644
--- a/README.md
+++ b/README.md
@@ -1,2 +1,78 @@
-# osschat-index
+# ElasticSearch Index
+*author: Jael*
+
+
+
+## Description
+
+The index operator index the given documents in ElasticSearch to get ready for retrieval.
+It accepts a single document in dictionary or a list of documents (dictionaries) as input.
+For each document, the index automatically generates a unique id.
+To use this operator, you need to [set up ElasticSearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html) in advance.
+
+
+
+## Code Example
+
+Insert an example document into ElasticSearch with address of localhost:9200 and index of 'test_index'.
+
+```python
+from datetime import datetime
+from towhee import pipe, ops, DataCollection
+
+
+example_doc = {
+ 'title': 'Test Title',
+ 'author': 'Towhee',
+ 'content': 'This is an example.',
+ 'timestamp': datetime.now()
+ }
+
+es_insert = (
+ pipe.input('index_name', 'doc')
+ .map(('index_name', 'doc'), 'res', ops.elasticsearch.index_client(
+ host='localhost', port=9200
+ ))
+ .output('doc', 'res')
+)
+
+res = es_insert('test_index', example_doc) # OR: es_insert('test_index', [example_doc])
+DataCollection(res).show() # Optional: display output data
+```
+
+
+
+## Factory Constructor
+
+Create the operator via the following factory method:
+
+***elasticsearch.search(host='localhost', port=9200, user=None, password=None, ca_certs=None)***
+
+**Parameters:**
+
+***host***: *str*
+
+The host to connect ElasticSearch client.
+
+***port***: *int*
+
+The port to connect ElasticSearch client.
+
+***user***: *str*
+
+The username to connect ElasticSearch client if needed, defaults to None.
+
+***password***: *str*
+
+The user password to connect ElasticSearch client if needed, defaults to None.
+
+***ca_certs***: *str*
+
+The path to CA certificates to connect ElasticSearch client if needed, defaults to None.
+
+
+
+**Returns:**
+
+Index response wrapped by `elastic_transport.ObjectApiResponse`.
diff --git a/__init__.py b/__init__.py
new file mode 100644
index 0000000..9d33678
--- /dev/null
+++ b/__init__.py
@@ -0,0 +1,4 @@
+from .es_index import ESIndex
+
+def osschat_index(*args, **kwargs):
+ return ESIndex(*args, **kwargs)
\ No newline at end of file
diff --git a/es_index.py b/es_index.py
new file mode 100644
index 0000000..c7128f4
--- /dev/null
+++ b/es_index.py
@@ -0,0 +1,107 @@
+import logging
+from typing import Union, List
+
+from elasticsearch import Elasticsearch
+import elasticsearch.helpers # type: ignore
+
+from towhee.operator import PyOperator, SharedType # type: ignore
+
+
+logger = logging.getLogger()
+
+
+class ESIndex(PyOperator):
+ """
+ Use bulk to insert docs into ElasticSearch index, using auto id generated.
+
+ Args:
+ host (`str`): host to connect ElasticSearch client
+ port (`int`): port to connect ElasticSearch client
+ user (`str=None`): user name to connect ElasticSearch client, defaults to None
+ password (`str=None`): user password to connect ElasticSearch client, defaults to None
+ ca_certs (`str=None`): path to CA certificate, defaults to None
+ """
+ def __init__(self, host: str, port: int, user: str = None, password: str = None, ca_certs: str = None):
+ super().__init__()
+ try:
+ self.client = Elasticsearch(
+ f'https://{host}:{port}',
+ ca_certs=ca_certs,
+ basic_auth=(user, password))
+ logger.info('Successfully connected to ElasticSearch client.')
+ except Exception as e:
+ logger.error('Failed to connect ElasticSearch client:\n', e)
+ raise e
+
+
+ def __call__(self, index_name: str, doc: Union[dict, List[dict]]):
+ # if index not exist, create with stop words analyzer to strengthen the search accuracy
+ if not self.is_index_exist(index_name):
+ logger.info(f'index{index_name} not exists, will create the index with stopwords analyzer')
+ self.create_index_with_stopwords(index_name)
+
+ if isinstance(doc, dict):
+ docs = [doc]
+ else:
+ docs = doc
+
+ for x in docs:
+ assert isinstance(x, dict)
+
+ actions = [
+ {
+ '_op_type': 'index',
+ '_index': index_name,
+ '_source': docs[i]
+ }
+ for i in range(len(docs))
+ ]
+ res = elasticsearch.helpers.bulk(self.client, actions, refresh=True)
+ return res
+
+ def is_index_exist(self, index_name: str):
+ return self.client.indices.exists(index=index_name)
+
+ def create_index_with_stopwords(self, index_name: str):
+ mappings = {
+ "properties": {
+ "milvus_id": {
+ "type": "long"
+ },
+ "paragraph": {
+ "type": "text",
+ "analyzer": "my_stop_analyzer",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ },
+ "path": {
+ "type": "text",
+ "analyzer": "my_stop_analyzer",
+ "fields": {
+ "keyword": {
+ "type": "keyword",
+ "ignore_above": 256
+ }
+ }
+ }
+ }
+ }
+ settings = {
+ "analysis": {
+ "analyzer": {
+ "my_stop_analyzer": {
+ "type": "stop",
+ "stopwords_path": "stopwords/stopwords-en.txt"
+ }
+ }
+ },
+ "number_of_shards": 3,
+ "number_of_replicas": 0
+ }
+ self.client.indices.create(index=index_name, mappings=mappings, settings=settings)
+ logger.info(f"created index{index_name}")
+