From b19b53e7f563db82e925b8dd7776acb59c68fb1f Mon Sep 17 00:00:00 2001 From: shiyu22 Date: Wed, 14 Jun 2023 11:56:58 +0800 Subject: [PATCH] Add osschat-milvus --- README.md | 93 +++++++++++++++++++++++++++++++++++++++++++++++- __init__.py | 4 +++ milvus_client.py | 58 ++++++++++++++++++++++++++++++ requirements.txt | 1 + 4 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 __init__.py create mode 100644 milvus_client.py create mode 100644 requirements.txt diff --git a/README.md b/README.md index bbc2d8f..52e759c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,93 @@ -# osschat-milvus +# ANN Insert Operator: MilvusClient + +
+ + + +## Desription + +Insert data into Milvus collections. **Please make sure you have [created Milvus Collection](https://milvus.io/docs/v2.0.x/create_collection.md) before loading the data.** + +
+ + + +## Code Example + +### Example + + +```python +import towhee + +from towhee import ops + +p = ( + towhee.pipe.input('collection_name', 'vec') + .map(('collection_name', 'vec'), (), ops.ann_insert.milvus_client(host='127.0.0.1', port='19530')) + .output() + ) + +p(vec) +``` + +### Load Collection + +> Please load the Collection after inserted data. + +```python +collection.load() +``` + +
+ + + +## Factory Constructor + +Create the operator via the following factory method: + +***ann_insert.milvus_client(host, port, user= None, password=None, collection_schema=None, index_params=None)*** + +**Parameters:** + +***host:*** *str* + +The host for Milvus. + +***port:*** *str* + +The port for Milvus. + +***user:*** *str* + +The user for Zilliz Cloud, defaults to None. + +***password:*** *str* + +The password for Zilliz Cloud, defaults to None. + + + +
+ + + +## Interface + +Insert Milvus data. + +**Parameters:** + +***collection_name:*** *str* + +The collection name for Milvus. + +***data:*** *list* + +The data to insert into milvus. + +**Returns:** MutationResult + +A MutationResult object contains `insert_count` represents how many and a `primary_keys` of primary keys. diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..aa80362 --- /dev/null +++ b/__init__.py @@ -0,0 +1,4 @@ +from .milvus_client import MilvusClient + +def osschat_milvus(*args, **kwargs): + return MilvusClient(*args, **kwargs) diff --git a/milvus_client.py b/milvus_client.py new file mode 100644 index 0000000..75a75c8 --- /dev/null +++ b/milvus_client.py @@ -0,0 +1,58 @@ +import uuid +import logging +from towhee.operator import PyOperator, SharedType +from pymilvus import connections, Collection + + +logger = logging.getLogger() + + +class MilvusClient(PyOperator): + """ + Milvus ANN index class. + """ + + def __init__(self, host: str, port: int, user: str = None, password: str = None): + self._host = host + self._port = port + self._connect_name = uuid.uuid4().hex + if None in [user, password]: + connections.connect(alias=self._connect_name, host=self._host, port=self._port) + else: + connections.connect(alias=self._connect_name, host=self._host, port=self._port, + user=user, password=password, secure=True) + + def __call__(self, collection_name: str, *data): + """ + Insert one row to Milvus. + + Args: + data (`list`): + The data to insert into milvus. + + Returns: + A MutationResult object contains `insert_count` represents how many and a `primary_keys` of primary keys. + + """ + self._collection = Collection(collection_name, using=self._connect_name) + row = [] + for item in data: + if isinstance(item, list): + row.extend([[i] for i in item]) + else: + row.append([item]) + mr = self._collection.insert(row) + if mr.insert_count != len(row[0]): + raise RuntimeError("Insert to milvus failed") + return mr + + @property + def shared_type(self): + return SharedType.NotShareable + + # def __del__(self): + # if connections.has_connection(self._connect_name): + # try: + # connections.disconnect(self._connect_name) + # except: + # pass diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..e0472f0 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +pymilvus