towhee
/
osschat-insert
copied
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Readme
Files and versions
123 lines
5.1 KiB
123 lines
5.1 KiB
# Copyright 2021 Zilliz. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from typing import Dict, Optional, Any
|
|
from pydantic import BaseModel, Extra
|
|
|
|
from towhee import ops, pipe, AutoPipes, AutoConfig
|
|
|
|
|
|
@AutoConfig.register
|
|
class OSSChatInsertConfig(BaseModel, extra=Extra.allow):
|
|
"""
|
|
Config of pipeline
|
|
"""
|
|
# config for text_splitter
|
|
type: Optional[str] = 'RecursiveCharacter'
|
|
chunk_size: Optional[int] = 300
|
|
splitter_kwargs: Optional[Dict[str, Any]] = {}
|
|
# config for sentence_embedding
|
|
embedding_model: Optional[str] = 'all-MiniLM-L6-v2'
|
|
openai_api_key: Optional[str] = None
|
|
embedding_device: Optional[int] = -1
|
|
embedding_normalize: Optional[bool] = True
|
|
# config for insert_milvus
|
|
milvus_uri: Optional[str] = None
|
|
milvus_token: Optional[str] = None
|
|
milvus_host: Optional[str] = '127.0.0.1'
|
|
milvus_port: Optional[str] = '19530'
|
|
milvus_user: Optional[str] = None
|
|
milvus_password: Optional[str] = None
|
|
# config for elasticsearch
|
|
es_enable: Optional[bool] = True
|
|
es_connection_kwargs: Optional[dict] = {'hosts': ['https://127.0.0.1:9200'], 'basic_auth': ('elastic', 'my_password')}
|
|
# token count
|
|
token_model: Optional[str] = 'gpt-3.5-turbo'
|
|
|
|
|
|
_hf_models = ops.sentence_embedding.transformers().get_op().supported_model_names()
|
|
_sbert_models = ops.sentence_embedding.sbert().get_op().supported_model_names()
|
|
_openai_models = ['text-embedding-ada-002', 'text-similarity-davinci-001',
|
|
'text-similarity-curie-001', 'text-similarity-babbage-001',
|
|
'text-similarity-ada-001']
|
|
|
|
|
|
def _get_embedding_op(config):
|
|
if config.embedding_device == -1:
|
|
device = 'cpu'
|
|
else:
|
|
device = config.embedding_device
|
|
|
|
if config.embedding_model in _hf_models:
|
|
return True, ops.sentence_embedding.transformers(model_name=config.embedding_model, device=device)
|
|
elif config.embedding_model in _openai_models:
|
|
return False, ops.sentence_embedding.openai(model_name=config.embedding_model, api_key=config.openai_api_key)
|
|
else:
|
|
return True, ops.sentence_embedding.sbert(model_name=config.embedding_model, device=device)
|
|
|
|
def data_loader(path):
|
|
if path.endswith('pdf'):
|
|
op = ops.data_loader.pdf_loader()
|
|
elif path.endswith(('xls', 'xslx')):
|
|
op = ops.data_loader.excel_loader()
|
|
elif path.endswith('ppt'):
|
|
op = ops.data_loader.powerpoint_loader()
|
|
else:
|
|
op = ops.text_loader()
|
|
return op(path)
|
|
|
|
@AutoPipes.register
|
|
def osschat_insert_pipe(config):
|
|
text_split_op = ops.text_splitter(type=config.type,
|
|
chunk_size=config.chunk_size,
|
|
**config.splitter_kwargs)
|
|
|
|
allow_triton, sentence_embedding_op = _get_embedding_op(config)
|
|
sentence_embedding_config = {}
|
|
if allow_triton:
|
|
if config.embedding_device >= 0:
|
|
sentence_embedding_config = AutoConfig.TritonGPUConfig(device_ids=[config.embedding_device], max_batch_size=128)
|
|
else:
|
|
sentence_embedding_config = AutoConfig.TritonCPUConfig()
|
|
|
|
insert_milvus_op = ops.ann_insert.osschat_milvus(uri=config.milvus_uri,
|
|
host=config.milvus_host,
|
|
port=config.milvus_port,
|
|
token=config.milvus_token,
|
|
user=config.milvus_user,
|
|
password=config.milvus_password,
|
|
)
|
|
|
|
p = (
|
|
pipe.input('doc', 'project_name')
|
|
.map('doc', 'text', data_loader)
|
|
.flat_map('text', 'chunk', text_split_op)
|
|
.map('chunk', 'token_count', ops.token_counter(config.token_model))
|
|
.map('chunk', 'embedding', sentence_embedding_op, config=sentence_embedding_config)
|
|
)
|
|
if config.embedding_normalize:
|
|
p = p.map('embedding', 'embedding', ops.towhee.np_normalize())
|
|
|
|
p = p.map(('project_name', 'doc', 'chunk', 'embedding'), 'milvus_res', insert_milvus_op)
|
|
|
|
if config.es_enable:
|
|
es_index_op = ops.elasticsearch.osschat_index(**config.es_connection_kwargs)
|
|
p = (
|
|
p.map('chunk', 'es_doc', lambda x: {'doc': x})
|
|
.map(('project_name', 'es_doc'), 'es_res', es_index_op)
|
|
.map(('milvus_res', 'es_res', 'token_count'), 'res', lambda x, y, c: {'milvus_res': x, 'es_res': y, 'token_count': c})
|
|
)
|
|
else:
|
|
p = p.map(('milvus_res', 'token_count'), 'res', lambda x, c: {'milvus_res': x, 'es_res': None, 'token_count': c})
|
|
return p.output('res')
|