# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Optional, Union, Any from towhee import ops, pipe, AutoPipes, AutoConfig from pydantic import BaseModel @AutoConfig.register class EnhancedQASearchConfig(BaseModel): """ Config of pipeline """ # config for sentence_embedding embedding_model: Optional[str] = 'all-MiniLM-L6-v2' openai_api_key: Optional[str] = None embedding_device: Optional[int] = -1 # config for search_milvus host: Optional[str] = '127.0.0.1' port: Optional[str] = '19530' collection_name: Optional[str] = 'chatbot' top_k: Optional[int] = 5 user: Optional[str] = None password: Optional[str] = None # config for similarity evaluation threshold: Optional[Union[float, int]] = 0.6 # config for llm llm_src: Optional[str] = 'openai' openai_model: Optional[str] = 'gpt-3.5-turbo' dolly_model: Optional[str] = 'databricks/dolly-v2-3b' customize_llm: Optional[Any] = None customize_prompt: Optional[Any] = None _hf_models = ops.sentence_embedding.transformers().get_op().supported_model_names() _sbert_models = ops.sentence_embedding.sbert().get_op().supported_model_names() _openai_models = ['text-embedding-ada-002', 'text-similarity-davinci-001', 'text-similarity-curie-001', 'text-similarity-babbage-001', 'text-similarity-ada-001'] def _get_embedding_op(config): if config.embedding_device == -1: device = 'cpu' else: device = config.embedding_device if config.embedding_model in _hf_models: return True, ops.sentence_embedding.transformers( model_name=config.embedding_model, device=device ) if config.embedding_model in _sbert_models: return True, ops.sentence_embedding.sbert( model_name=config.embedding_model, device=device ) if config.embedding_model in _openai_models: return False, ops.sentence_embedding.openai( model_name=config.embedding_model, api_key=config.openai_api_key ) raise RuntimeError('Unknown model: [%s], only support: %s' % (config.embedding_model, _hf_models + _sbert_models + _openai_models)) def _get_similarity_evaluation_op(config): return lambda x: [i for i in x if i[1] >= config.threshold] def _get_llm_op(config): if config.customize_llm: return config.customize_llm if config.llm_src.lower() == 'openai': return ops.LLM.OpenAI(model_name=config.openai_model, api_key=config.openai_api_key) if config.llm_src.lower() == 'dolly': return ops.LLM.Dolly(model_name=config.dolly_model) raise RuntimeError('Unknown llm source: [%s], only support \'openai\' and \'dolly\'' % (config.llm_src)) def _get_prompt(config): if config.customize_prompt: return config.customize_prompt return ops.prompt.question_answer(llm_name=config.llm_src) @AutoPipes.register def enhanced_qa_search_pipe(config): allow_triton, sentence_embedding_op = _get_embedding_op(config) sentence_embedding_config = {} if allow_triton: if config.embedding_device >= 0: sentence_embedding_config = AutoConfig.TritonGPUConfig(device_ids=[config.embedding_device], max_batch_size=128) else: sentence_embedding_config = AutoConfig.TritonCPUConfig() search_milvus_op = ops.ann_search.milvus_client( host=config.host, port=config.port, collection_name=config.collection_name, limit=config.top_k, output_fields=['text'], metric_type='IP', user=config.user, password=config.password, ) llm_op = _get_llm_op(config) p = ( pipe.input('question', 'history') .map('question', 'embedding', sentence_embedding_op, config=sentence_embedding_config) .map('embedding', 'embedding', ops.towhee.np_normalize()) .map('embedding', 'result', search_milvus_op) ) # if config.similarity_evaluation: if config.threshold: sim_eval_op = _get_similarity_evaluation_op(config) p = p.map('result', 'result', sim_eval_op) p = ( p.map('result', 'docs', lambda x: '\n'.join([i[2] for i in x])) .map(('question', 'docs', 'history'), 'prompt', _get_prompt(config)) .map('prompt', 'answer', llm_op) ) return p.output('answer')