|
@ -21,13 +21,13 @@ class ESSearch(PyOperator): |
|
|
password (`str=None`): user password to connect ElasticSearch client, defaults to None |
|
|
password (`str=None`): user password to connect ElasticSearch client, defaults to None |
|
|
ca_certs (`str=None`): path to CA certificate, defaults to None |
|
|
ca_certs (`str=None`): path to CA certificate, defaults to None |
|
|
""" |
|
|
""" |
|
|
def __init__(self, host: str, port: int, user: str = None, password: str = None, ca_certs: str = None): |
|
|
|
|
|
|
|
|
def __init__(self, **connection_kwargs): |
|
|
super().__init__() |
|
|
super().__init__() |
|
|
try: |
|
|
try: |
|
|
self.client = Elasticsearch( |
|
|
|
|
|
f'http://{host}:{port}' if not ca_certs else f'https://{host}:{port}', |
|
|
|
|
|
ca_certs=ca_certs, |
|
|
|
|
|
basic_auth=(user, password) if user and password else None) |
|
|
|
|
|
|
|
|
if 'port' in connection_kwargs: |
|
|
|
|
|
assert 'host' in connection_kwargs, 'Missing port in connection kwargs but given port only.' |
|
|
|
|
|
connection_kwargs['hosts'] = [f'https://{host}:{port}'] |
|
|
|
|
|
self.client = Elasticsearch(**connection_kwargs) |
|
|
logger.info('Successfully connected to ElasticSearch client.') |
|
|
logger.info('Successfully connected to ElasticSearch client.') |
|
|
except Exception as e: |
|
|
except Exception as e: |
|
|
logger.error('Failed to connect ElasticSearch client:\n', e) |
|
|
logger.error('Failed to connect ElasticSearch client:\n', e) |
|
|