import uuid import logging from towhee.operator import PyOperator from pymilvus import connections, Collection logger = logging.getLogger() class MilvusClient(PyOperator): """ Milvus ANN index class. """ def __init__(self, host: str, port: int, collection_name: str): self._host = host self._port = port self._collection_name = collection_name self._connect_name = uuid.uuid4().hex connections.connect(alias=self._connect_name, host=self._host, port=self._port) self._collection = Collection(self._collection_name, using=self._connect_name) def __call__(self, *row): """ Insert data to Milvus. Args: data (`list`): The data to insert into milvus. Returns: A MutationResult object contains `insert_count` represents how many and a `primary_keys` of primary keys. """ mr = self._collection.insert(row) if mr.err_count > 0: raise RuntimeError("Insert to milvus failed") return None