# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict, Optional, Any from pydantic import BaseModel, Extra from towhee import ops, pipe, AutoPipes, AutoConfig @AutoConfig.register class OSSChatInsertConfig(BaseModel, extra=Extra.allow): """ Config of pipeline """ # config for text_splitter type: Optional[str] = 'RecursiveCharacter' chunk_size: Optional[int] = 300 splitter_kwargs: Optional[Dict[str, Any]] = {} # config for sentence_embedding embedding_model: Optional[str] = 'all-MiniLM-L6-v2' openai_api_key: Optional[str] = None embedding_device: Optional[int] = -1 embedding_normalize: Optional[bool] = True # config for insert_milvus milvus_uri: Optional[str] = None milvus_token: Optional[str] = None milvus_host: Optional[str] = '127.0.0.1' milvus_port: Optional[str] = '19530' milvus_user: Optional[str] = None milvus_password: Optional[str] = None # config for elasticsearch es_enable: Optional[bool] = True es_connection_kwargs: Optional[dict] = {'hosts': ['https://127.0.0.1:9200'], 'basic_auth': ('elastic', 'my_password')} # token count token_model: Optional[str] = 'gpt-3.5-turbo' _hf_models = ops.sentence_embedding.transformers().get_op().supported_model_names() _sbert_models = ops.sentence_embedding.sbert().get_op().supported_model_names() _openai_models = ['text-embedding-ada-002', 'text-similarity-davinci-001', 'text-similarity-curie-001', 'text-similarity-babbage-001', 'text-similarity-ada-001'] def _get_embedding_op(config): if config.embedding_device == -1: device = 'cpu' else: device = config.embedding_device if config.embedding_model in _hf_models: return True, ops.sentence_embedding.transformers(model_name=config.embedding_model, device=device) elif config.embedding_model in _openai_models: return False, ops.sentence_embedding.openai(model_name=config.embedding_model, api_key=config.openai_api_key) else: return True, ops.sentence_embedding.sbert(model_name=config.embedding_model, device=device) def data_loader(path): if path.endswith('pdf'): op = ops.data_loader.pdf_loader() elif path.endswith(('xls', 'xslx')): op = ops.data_loader.excel_loader() elif path.endswith('ppt'): op = ops.data_loader.powerpoint_loader() else: op = ops.text_loader() return op(path) @AutoPipes.register def osschat_insert_pipe(config): text_split_op = ops.text_splitter(type=config.type, chunk_size=config.chunk_size, **config.splitter_kwargs) allow_triton, sentence_embedding_op = _get_embedding_op(config) sentence_embedding_config = {} if allow_triton: if config.embedding_device >= 0: sentence_embedding_config = AutoConfig.TritonGPUConfig(device_ids=[config.embedding_device], max_batch_size=128) else: sentence_embedding_config = AutoConfig.TritonCPUConfig() insert_milvus_op = ops.ann_insert.osschat_milvus(uri=config.milvus_uri, host=config.milvus_host, port=config.milvus_port, token=config.milvus_token, user=config.milvus_user, password=config.milvus_password, ) p = ( pipe.input('doc', 'project_name') .map('doc', 'text', data_loader) .flat_map('text', 'chunk', text_split_op) .map('chunk', 'token_count', ops.token_counter(config.token_model)) .map('chunk', 'embedding', sentence_embedding_op, config=sentence_embedding_config) ) if config.embedding_normalize: p = p.map('embedding', 'embedding', ops.towhee.np_normalize()) p = p.map(('project_name', 'doc', 'chunk', 'embedding'), 'milvus_res', insert_milvus_op) if config.es_enable: es_index_op = ops.elasticsearch.osschat_index(**config.es_connection_kwargs) p = ( p.map('chunk', 'es_doc', lambda x: {'doc': x}) .map(('project_name', 'es_doc'), 'es_res', es_index_op) .map(('milvus_res', 'es_res', 'token_count'), 'res', lambda x, y, c: {'milvus_res': x, 'es_res': y, 'token_count': c}) ) else: p = p.map(('milvus_res', 'token_count'), 'res', lambda x, c: {'milvus_res': x, 'es_res': None, 'token_count': c}) return p.output('res')