import logging from typing import Union, List from elasticsearch import Elasticsearch import elasticsearch.helpers # type: ignore from towhee.operator import PyOperator, SharedType # type: ignore logger = logging.getLogger() class ESIndex(PyOperator): """ Use bulk to insert docs into ElasticSearch index, using auto id generated. Args: host (`str`): host to connect ElasticSearch client port (`int`): port to connect ElasticSearch client index_name (`str`): index name to index input docs user (`str=None`): user name to connect ElasticSearch client, defaults to None password (`str=None`): user password to connect ElasticSearch client, defaults to None ca_certs (`str=None`): path to CA certificate, defaults to None """ def __init__(self, host: str, port: int, index_name: str, user: str = None, password: str = None, ca_certs: str = None): super().__init__() self.index_name = index_name try: self.client = Elasticsearch( f'https://{host}:{port}', ca_certs=ca_certs, basic_auth=(user, password)) logger.info('Successfully connected to ElasticSearch client.') except Exception as e: logger.error('Failed to connect ElasticSearch client:\n', e) raise e def __call__(self, doc: Union[dict, List[dict]]): if isinstance(doc, dict): docs = [doc] else: docs = doc for x in docs: assert isinstance(x, dict) actions = [ { '_op_type': 'index', '_index': self.index_name, '_source': docs[i] } for i in range(len(docs)) ] res = elasticsearch.helpers.bulk(self.client, actions, refresh=True) return res