# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from towhee import ops, pipe, AutoPipes, AutoConfig @AutoConfig.register class EnhancedQASearchConfig: """ Config of pipeline """ def __init__(self): # config for sentence_embedding self.model = 'all-MiniLM-L6-v2' self.openai_api_key = None self.normalize_vec = True self.device = -1 # config for search_milvus self.host = '127.0.0.1' self.port = '19530' self.collection_name = 'chatbot' self.top_k = 5 self.metric_type='IP' self.output_fields=['sentence'] self.user = None self.password = None # config for similarity evaluation self.threshold = 0.6 # self.similarity_evaluation = 'score_filter' _hf_models = ops.sentence_embedding.transformers().get_op().supported_model_names() _sbert_models = ops.sentence_embedding.sbert().get_op().supported_model_names() _openai_models = ['text-embedding-ada-002', 'text-similarity-davinci-001', 'text-similarity-curie-001', 'text-similarity-babbage-001', 'text-similarity-ada-001'] def _get_embedding_op(config): if config.device == -1: device = 'cpu' else: device = config.device if config.customize_embedding_op is not None: return True, config.customize_embedding_op if config.model in _hf_models: return True, ops.sentence_embedding.transformers( model_name=config.model, device=device ) if config.model in _sbert_models: return True, ops.sentence_embedding.sbert( model_name=config.model, device=device ) if config.model in _openai_models: return False, ops.sentence_embedding.openai( model_name=config.model, api_key=config.openai_api_key ) raise RuntimeError('Unknown model: [%s], only support: %s' % (config.model, _hf_models + _openai_models)) def _get_similarity_evaluation_op(config): # if config.similarity_evaluation == 'score_filter': return lambda x: [i for i in x if i[1] >= config.threshold] @AutoPipes.register def enhanced_qa_search_pipe(config): allow_triton, sentence_embedding_op = _get_embedding_op(config) sentence_embedding_config = {} if allow_triton: if config.device >= 0: sentence_embedding_config = AutoConfig.TritonGPUConfig(device_ids=[config.device], max_batch_size=128) else: sentence_embedding_config = AutoConfig.TritonCPUConfig() search_milvus_op = ops.ann_search.milvus_client( host=config.host, port=config.port, collection_name=config.collection_name, limit=config.top_k, output_fields=config.output_fields, metric_type=config.metric_type, user=config.user, password=config.password, ) p = ( pipe.input('question', 'history') .map('question', 'embedding', sentence_embedding_op, config=sentence_embedding_config) ) if config.normalize_vec: p = p.map('embedding', 'embedding', ops.towhee.np_normalize()) p = p.map('embedding', 'result', search_milvus_op) # if config.similarity_evaluation: if config.threshold: sim_eval_op = _get_similarity_evaluation_op(config) p = p.map('result', 'result', sim_eval_op) return p.output('question', 'history', 'result')