osschat-milvus
copied
4 changed files with 155 additions and 1 deletions
@ -1,2 +1,93 @@ |
|||||
# osschat-milvus |
|
||||
|
# ANN Insert Operator: MilvusClient |
||||
|
|
||||
|
|
||||
|
<br /> |
||||
|
|
||||
|
|
||||
|
|
||||
|
## Desription |
||||
|
|
||||
|
Insert data into Milvus collections. **Please make sure you have [created Milvus Collection](https://milvus.io/docs/v2.0.x/create_collection.md) before loading the data.** |
||||
|
|
||||
|
<br /> |
||||
|
|
||||
|
|
||||
|
|
||||
|
## Code Example |
||||
|
|
||||
|
### Example |
||||
|
|
||||
|
|
||||
|
```python |
||||
|
import towhee |
||||
|
|
||||
|
from towhee import ops |
||||
|
|
||||
|
p = ( |
||||
|
towhee.pipe.input('collection_name', 'vec') |
||||
|
.map(('collection_name', 'vec'), (), ops.ann_insert.milvus_client(host='127.0.0.1', port='19530')) |
||||
|
.output() |
||||
|
) |
||||
|
|
||||
|
p(vec) |
||||
|
``` |
||||
|
|
||||
|
### Load Collection |
||||
|
|
||||
|
> Please load the Collection after inserted data. |
||||
|
|
||||
|
```python |
||||
|
collection.load() |
||||
|
``` |
||||
|
|
||||
|
<br /> |
||||
|
|
||||
|
|
||||
|
|
||||
|
## Factory Constructor |
||||
|
|
||||
|
Create the operator via the following factory method: |
||||
|
|
||||
|
***ann_insert.milvus_client(host, port, user= None, password=None, collection_schema=None, index_params=None)*** |
||||
|
|
||||
|
**Parameters:** |
||||
|
|
||||
|
***host:*** *str* |
||||
|
|
||||
|
The host for Milvus. |
||||
|
|
||||
|
***port:*** *str* |
||||
|
|
||||
|
The port for Milvus. |
||||
|
|
||||
|
***user:*** *str* |
||||
|
|
||||
|
The user for Zilliz Cloud, defaults to None. |
||||
|
|
||||
|
***password:*** *str* |
||||
|
|
||||
|
The password for Zilliz Cloud, defaults to None. |
||||
|
|
||||
|
|
||||
|
|
||||
|
<br /> |
||||
|
|
||||
|
|
||||
|
|
||||
|
## Interface |
||||
|
|
||||
|
Insert Milvus data. |
||||
|
|
||||
|
**Parameters:** |
||||
|
|
||||
|
***collection_name:*** *str* |
||||
|
|
||||
|
The collection name for Milvus. |
||||
|
|
||||
|
***data:*** *list* |
||||
|
|
||||
|
The data to insert into milvus. |
||||
|
|
||||
|
**Returns:** MutationResult |
||||
|
|
||||
|
A MutationResult object contains `insert_count` represents how many and a `primary_keys` of primary keys. |
||||
|
@ -0,0 +1,4 @@ |
|||||
|
from .milvus_client import MilvusClient |
||||
|
|
||||
|
def osschat_milvus(*args, **kwargs): |
||||
|
return MilvusClient(*args, **kwargs) |
@ -0,0 +1,58 @@ |
|||||
|
import uuid |
||||
|
import logging |
||||
|
from towhee.operator import PyOperator, SharedType |
||||
|
from pymilvus import connections, Collection |
||||
|
|
||||
|
|
||||
|
logger = logging.getLogger() |
||||
|
|
||||
|
|
||||
|
class MilvusClient(PyOperator): |
||||
|
""" |
||||
|
Milvus ANN index class. |
||||
|
""" |
||||
|
|
||||
|
def __init__(self, host: str, port: int, user: str = None, password: str = None): |
||||
|
self._host = host |
||||
|
self._port = port |
||||
|
self._connect_name = uuid.uuid4().hex |
||||
|
if None in [user, password]: |
||||
|
connections.connect(alias=self._connect_name, host=self._host, port=self._port) |
||||
|
else: |
||||
|
connections.connect(alias=self._connect_name, host=self._host, port=self._port, |
||||
|
user=user, password=password, secure=True) |
||||
|
|
||||
|
def __call__(self, collection_name: str, *data): |
||||
|
""" |
||||
|
Insert one row to Milvus. |
||||
|
|
||||
|
Args: |
||||
|
data (`list`): |
||||
|
The data to insert into milvus. |
||||
|
|
||||
|
Returns: |
||||
|
A MutationResult object contains `insert_count` represents how many and a `primary_keys` of primary keys. |
||||
|
|
||||
|
""" |
||||
|
self._collection = Collection(collection_name, using=self._connect_name) |
||||
|
row = [] |
||||
|
for item in data: |
||||
|
if isinstance(item, list): |
||||
|
row.extend([[i] for i in item]) |
||||
|
else: |
||||
|
row.append([item]) |
||||
|
mr = self._collection.insert(row) |
||||
|
if mr.insert_count != len(row[0]): |
||||
|
raise RuntimeError("Insert to milvus failed") |
||||
|
return mr |
||||
|
|
||||
|
@property |
||||
|
def shared_type(self): |
||||
|
return SharedType.NotShareable |
||||
|
|
||||
|
# def __del__(self): |
||||
|
# if connections.has_connection(self._connect_name): |
||||
|
# try: |
||||
|
# connections.disconnect(self._connect_name) |
||||
|
# except: |
||||
|
# pass |
@ -0,0 +1 @@ |
|||||
|
pymilvus |
Loading…
Reference in new issue