import logging from typing import Union, List from elasticsearch import Elasticsearch # import elasticsearch.helpers # type: ignore from towhee.operator import PyOperator, SharedType # type: ignore logger = logging.getLogger() class ESSearch(PyOperator): """ Search using ElasticSearch with client ready Args: host (`str`): host to connect ElasticSearch client port (`int`): port to connect ElasticSearch client user (`str=None`): user name to connect ElasticSearch client, defaults to None password (`str=None`): user password to connect ElasticSearch client, defaults to None ca_certs (`str=None`): path to CA certificate, defaults to None """ def __init__(self, host: str, port: int, user: str = None, password: str = None, ca_certs: str = None): super().__init__() try: self.client = Elasticsearch( f'http://{host}:{port}' if not ca_certs else f'https://{host}:{port}', ca_certs=ca_certs, basic_auth=(user, password) if user and password else None) logger.info('Successfully connected to ElasticSearch client.') except Exception as e: logger.error('Failed to connect ElasticSearch client:\n', e) raise e def __call__(self, index_name: str, query: dict = {'match_all': {}}, enable: bool=True): if not enable: return '' resp = self.client.search(index=index_name, query=query) return resp @property def shared_type(self): return SharedType.NotShareable