logo
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Readme
Files and versions

53 lines
1.4 KiB

import uuid
import logging
from towhee.operator import PyOperator, SharedType
from pymilvus import connections, Collection
logger = logging.getLogger()
class MilvusClient(PyOperator):
"""
Milvus ANN index class.
"""
def __init__(self, host: str, port: int, collection_name: str):
self._host = host
self._port = port
self._collection_name = collection_name
self._connect_name = uuid.uuid4().hex
connections.connect(alias=self._connect_name, host=self._host, port=self._port)
self._collection = Collection(self._collection_name, using=self._connect_name)
def __call__(self, *data):
"""
Insert data to Milvus.
Args:
data (`list`):
The data to insert into milvus.
Returns:
A MutationResult object contains `insert_count` represents how many and a `primary_keys` of primary keys.
"""
row = []
for item in data:
row.append([item])
mr = self._collection.insert(row)
if mr.err_count > 0:
raise RuntimeError("Insert to milvus failed")
return None
@property
def shared_type(self):
return SharedType.Shareable
def __del__(self):
if connections.has_connection(self._connect_name):
try:
connections.disconnect(self._connect_name)
except:
pass