osschat-search
              
                
                
            
          copied
				 3 changed files with 119 additions and 1 deletions
			
			
		@ -1,2 +1,72 @@ | 
				
			|||
# osschat-search | 
				
			|||
# ElasticSearch Search | 
				
			|||
 | 
				
			|||
*author: Jael* | 
				
			|||
 | 
				
			|||
<br /> | 
				
			|||
 | 
				
			|||
## Description | 
				
			|||
 | 
				
			|||
The search operator runs a search via ElasticSearch client given a query. | 
				
			|||
It takes a dictionary as input, which should be written in [Query DSL](https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html). | 
				
			|||
To use this operator, you need to [set up ElasticSearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html) in advance. | 
				
			|||
 | 
				
			|||
<br /> | 
				
			|||
 | 
				
			|||
## Code Example | 
				
			|||
 | 
				
			|||
With an example ElasticSearch client 'localhost:9200', | 
				
			|||
search across the index 'test_index' using an example query `{'match_all': {}}` to get all documents. | 
				
			|||
 | 
				
			|||
```python | 
				
			|||
from towhee import pipe, ops, DataCollection | 
				
			|||
 | 
				
			|||
 | 
				
			|||
query = {'match_all': {}} | 
				
			|||
es_search = ( | 
				
			|||
    pipe.input('index_name', 'query') | 
				
			|||
        .map(('index_name', 'query'), 'res', ops.elasticsearch.search_client( | 
				
			|||
            host='localhost', port=9200 | 
				
			|||
        )) | 
				
			|||
        .output('query', 'res') | 
				
			|||
) | 
				
			|||
 | 
				
			|||
res = es_search('test_index', query) | 
				
			|||
DataCollection(res).show() # Optional: display output data | 
				
			|||
``` | 
				
			|||
 | 
				
			|||
<br /> | 
				
			|||
 | 
				
			|||
## Factory Constructor | 
				
			|||
 | 
				
			|||
Create the operator via the following factory method: | 
				
			|||
 | 
				
			|||
***elasticsearch.search(host='localhost', port=9200, user=None, password=None, ca_certs=None)*** | 
				
			|||
 | 
				
			|||
**Parameters:** | 
				
			|||
 | 
				
			|||
***host***: *str* | 
				
			|||
 | 
				
			|||
The host to connect ElasticSearch client. | 
				
			|||
 | 
				
			|||
***port***: *int* | 
				
			|||
 | 
				
			|||
The port to connect ElasticSearch client. | 
				
			|||
 | 
				
			|||
***user***: *str* | 
				
			|||
 | 
				
			|||
The username to connect ElasticSearch client if needed, defaults to None. | 
				
			|||
 | 
				
			|||
***password***: *str* | 
				
			|||
 | 
				
			|||
The user password to connect ElasticSearch client if needed, defaults to None. | 
				
			|||
 | 
				
			|||
***ca_certs***: *str* | 
				
			|||
 | 
				
			|||
The path to CA certificates to connect ElasticSearch client if needed, defaults to None. | 
				
			|||
 | 
				
			|||
 | 
				
			|||
<br /> | 
				
			|||
 | 
				
			|||
**Returns:** | 
				
			|||
 | 
				
			|||
Search results wrapped by `elastic_transport.ObjectApiResponse`. | 
				
			|||
 | 
				
			|||
@ -0,0 +1,4 @@ | 
				
			|||
from .es_search import ESSearch | 
				
			|||
 | 
				
			|||
def osschat_search(*args, **kwargs): | 
				
			|||
    return ESSearch(*args, **kwargs) | 
				
			|||
@ -0,0 +1,44 @@ | 
				
			|||
import logging | 
				
			|||
from typing import Union, List | 
				
			|||
 | 
				
			|||
from elasticsearch import Elasticsearch | 
				
			|||
# import elasticsearch.helpers # type: ignore | 
				
			|||
 | 
				
			|||
from towhee.operator import PyOperator, SharedType # type: ignore | 
				
			|||
 | 
				
			|||
 | 
				
			|||
logger = logging.getLogger() | 
				
			|||
 | 
				
			|||
 | 
				
			|||
class ESSearch(PyOperator): | 
				
			|||
    """ | 
				
			|||
    Search using ElasticSearch with client ready | 
				
			|||
 | 
				
			|||
    Args: | 
				
			|||
        host (`str`): host to connect ElasticSearch client | 
				
			|||
        port (`int`): port to connect ElasticSearch client | 
				
			|||
        user (`str=None`): user name to connect ElasticSearch client, defaults to None | 
				
			|||
        password (`str=None`): user password to connect ElasticSearch client, defaults to None | 
				
			|||
        ca_certs (`str=None`): path to CA certificate, defaults to None | 
				
			|||
    """ | 
				
			|||
    def __init__(self, host: str, port: int, user: str = None, password: str = None, ca_certs: str = None): | 
				
			|||
        super().__init__() | 
				
			|||
        try: | 
				
			|||
            self.client = Elasticsearch( | 
				
			|||
                f'https://{host}:{port}', | 
				
			|||
                ca_certs=ca_certs, | 
				
			|||
                basic_auth=(user, password)) | 
				
			|||
            logger.info('Successfully connected to ElasticSearch client.') | 
				
			|||
        except Exception as e: | 
				
			|||
            logger.error('Failed to connect ElasticSearch client:\n', e) | 
				
			|||
            raise e | 
				
			|||
 | 
				
			|||
    def __call__(self, index_name: str, query: dict = {'match_all': {}}, enable: bool=True): | 
				
			|||
        if not enable: | 
				
			|||
            return None | 
				
			|||
        resp = self.client.search(index=index_name, query=query) | 
				
			|||
        return resp | 
				
			|||
 | 
				
			|||
    @property | 
				
			|||
    def shared_type(self): | 
				
			|||
        return SharedType.NotShareable | 
				
			|||
					Loading…
					
					
				
		Reference in new issue