logo
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Readme
Files and versions

45 lines
1.5 KiB

import logging
from typing import Union, List
from elasticsearch import Elasticsearch
# import elasticsearch.helpers # type: ignore
from towhee.operator import PyOperator, SharedType # type: ignore
logger = logging.getLogger()
class ESSearch(PyOperator):
"""
Search using ElasticSearch with client ready
Args:
host (`str`): host to connect ElasticSearch client
port (`int`): port to connect ElasticSearch client
user (`str=None`): user name to connect ElasticSearch client, defaults to None
password (`str=None`): user password to connect ElasticSearch client, defaults to None
ca_certs (`str=None`): path to CA certificate, defaults to None
"""
def __init__(self, **connection_kwargs):
super().__init__()
try:
if 'port' in connection_kwargs:
assert 'host' in connection_kwargs, 'Missing port in connection kwargs but given port only.'
connection_kwargs['hosts'] = [f'https://{host}:{port}']
self.client = Elasticsearch(**connection_kwargs)
logger.info('Successfully connected to ElasticSearch client.')
except Exception as e:
logger.error('Failed to connect ElasticSearch client:\n', e)
raise e
2 years ago
def __call__(self, index_name: str, query: dict = {'match_all': {}}):
2 years ago
if not query:
return ''
resp = self.client.search(index=index_name, query=query)
return resp
@property
def shared_type(self):
return SharedType.NotShareable