|
|
@ -96,19 +96,19 @@ def osschat_insert_pipe(config): |
|
|
|
p = ( |
|
|
|
pipe.input('doc', 'project_name') |
|
|
|
.map('doc', 'text', data_loader) |
|
|
|
.flat_map('text', 'sentence', text_split_op) |
|
|
|
.map('sentence', 'embedding', sentence_embedding_op, config=sentence_embedding_config) |
|
|
|
.flat_map('text', 'chunk', text_split_op) |
|
|
|
.map('chunk', 'embedding', sentence_embedding_op, config=sentence_embedding_config) |
|
|
|
) |
|
|
|
if config.embedding_normalize: |
|
|
|
p = p.map('embedding', 'embedding', ops.towhee.np_normalize()) |
|
|
|
|
|
|
|
p = p.map(('project_name', 'doc', 'sentence', 'embedding'), 'milvus_res', insert_milvus_op) |
|
|
|
p = p.map(('project_name', 'doc', 'chunk', 'embedding'), 'milvus_res', insert_milvus_op) |
|
|
|
|
|
|
|
if config.es_enable: |
|
|
|
es_index_op = ops.elasticsearch.osschat_index(**config.es_connection_kwargs) |
|
|
|
p = ( |
|
|
|
p.map('sentence', 'es_sentence', lambda x: {'sentence': x}) |
|
|
|
.map(('project_name', 'es_sentence'), 'es_res', es_index_op) |
|
|
|
p.map('chunk', 'es_doc', lambda x: {'doc': x}) |
|
|
|
.map(('project_name', 'es_doc'), 'es_res', es_index_op) |
|
|
|
.map(('milvus_res', 'es_res'), 'res', lambda x, y: {'milvus_res': x, 'es_res': y}) |
|
|
|
) |
|
|
|
else: |
|
|
|