|
|
|
import os
|
|
|
|
import onnxruntime
|
|
|
|
from transformers.onnx.features import FeaturesManager
|
|
|
|
from transformers.onnx import validate_model_outputs
|
|
|
|
|
|
|
|
import towhee
|
|
|
|
from towhee import ops
|
|
|
|
from pymilvus import connections, DataType, FieldSchema, Collection, CollectionSchema, utility
|
|
|
|
from datasets import load_dataset
|
|
|
|
|
|
|
|
from statistics import mode
|
|
|
|
import argparse
|
|
|
|
|
|
|
|
import transformers
|
|
|
|
|
|
|
|
transformers.logging.set_verbosity_error()
|
|
|
|
|
|
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument('--model', required=True, type=str)
|
|
|
|
parser.add_argument('--dataset', type=str, default='imdb')
|
|
|
|
parser.add_argument('--insert_size', type=int, default=1000)
|
|
|
|
parser.add_argument('--query_size', type=int, default=100)
|
|
|
|
parser.add_argument('--topk', type=int, default=10)
|
|
|
|
parser.add_argument('--collection_name', type=str, default=None)
|
|
|
|
parser.add_argument('--format', type=str, required=True)
|
|
|
|
parser.add_argument('--onnx_dir', type=str, default='../saved/onnx')
|
|
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
model_name = args.model
|
|
|
|
onnx_path = os.path.join(args.onnx_dir, model_name.replace('/', '-') + '.onnx')
|
|
|
|
dataset_name = args.dataset
|
|
|
|
insert_size = args.insert_size
|
|
|
|
query_size = args.query_size
|
|
|
|
topk = args.topk
|
|
|
|
collection_name = args.collection_name if args.collection_name else model_name.replace('-', '_').replace('/', '_')
|
|
|
|
|
|
|
|
device = 'cpu'
|
|
|
|
host = 'localhost'
|
|
|
|
port = '19530'
|
|
|
|
index_type = 'FLAT'
|
|
|
|
metric_type = 'L2'
|
|
|
|
|
|
|
|
data = load_dataset(dataset_name).shuffle(seed=32)
|
|
|
|
assert insert_size <= len(data['train']), 'There is no enough data. Please decrease insert size.'
|
|
|
|
assert insert_size <= len(data['test']), 'There is no enough data. Please decrease query size.'
|
|
|
|
|
|
|
|
insert_data = data['train']
|
|
|
|
insert_data = insert_data[:insert_size] if insert_size else insert_data
|
|
|
|
query_data = data['test']
|
|
|
|
query_data = query_data[:query_size] if query_size else query_data
|
|
|
|
|
|
|
|
# Warm up
|
|
|
|
print('Warming up...')
|
|
|
|
op = ops.text_embedding.transformers(model_name=model_name, device=device).get_op()
|
|
|
|
dim = op('This is test.').shape[-1]
|
|
|
|
print(f'output dim: {dim}')
|
|
|
|
|
|
|
|
# Prepare Milvus
|
|
|
|
print('Connecting milvus ...')
|
|
|
|
connections.connect(host=host, port=port)
|
|
|
|
|
|
|
|
|
|
|
|
def create_milvus(collection_name):
|
|
|
|
print('Creating collection ...')
|
|
|
|
fields = [
|
|
|
|
FieldSchema(name='id', dtype=DataType.INT64, description='embedding id', is_primary=True, auto_id=True),
|
|
|
|
FieldSchema(name='embedding', dtype=DataType.FLOAT_VECTOR, description='text embedding', dim=dim),
|
|
|
|
FieldSchema(name='label', dtype=DataType.VARCHAR, description='label', max_length=500)
|
|
|
|
]
|
|
|
|
schema = CollectionSchema(fields=fields, description=f'text embeddings for {model_name} on {dataset_name}')
|
|
|
|
if utility.has_collection(collection_name):
|
|
|
|
print(f'drop old collection: {collection_name}')
|
|
|
|
collection = Collection(collection_name)
|
|
|
|
collection.drop()
|
|
|
|
collection = Collection(name=collection_name, schema=schema)
|
|
|
|
print(f'A new collection is created: {collection_name}.')
|
|
|
|
return collection
|
|
|
|
|
|
|
|
|
|
|
|
if args.format == 'pytorch':
|
|
|
|
collection_name = collection_name + '_pytorch'
|
|
|
|
|
|
|
|
def insert(model_name, collection_name):
|
|
|
|
(
|
|
|
|
towhee.dc['text', 'label'](zip(insert_data['text'], insert_data['label'])).stream()
|
|
|
|
.runas_op['text', 'text'](lambda s: s[:1024])
|
|
|
|
.text_embedding.transformers['text', 'emb'](model_name=model_name, device=device)
|
|
|
|
.runas_op['emb', 'emb'](lambda x: x[0])
|
|
|
|
.runas_op['label', 'label'](lambda y: str(y))
|
|
|
|
.ann_insert.milvus[('emb', 'label'), 'miluvs_insert'](
|
|
|
|
uri=f'tcp://{host}:{port}/{collection_name}'
|
|
|
|
)
|
|
|
|
.show(3)
|
|
|
|
)
|
|
|
|
collection = Collection(collection_name)
|
|
|
|
return collection.num_entities
|
|
|
|
|
|
|
|
|
|
|
|
def query(model_name, collection_name):
|
|
|
|
benchmark = (
|
|
|
|
towhee.dc['text', 'gt'](zip(query_data['text'], query_data['label'])).stream()
|
|
|
|
.runas_op['text', 'text'](lambda s: s[:1024])
|
|
|
|
.text_embedding.transformers['text', 'emb'](model_name=model_name, device=device)
|
|
|
|
.runas_op['emb', 'emb'](lambda x: x[0])
|
|
|
|
.runas_op['gt', 'gt'](lambda y: str(y))
|
|
|
|
.ann_search.milvus['emb', 'milvus_res'](
|
|
|
|
uri=f'tcp://{host}:{port}/{collection_name}',
|
|
|
|
metric_type=metric_type,
|
|
|
|
limit=topk,
|
|
|
|
output_fields=['label']
|
|
|
|
)
|
|
|
|
.runas_op['milvus_res', 'preds'](lambda x: [y.label for y in x]).unstream()
|
|
|
|
.runas_op['preds', 'pred1'](lambda x: mode(x[:1]))
|
|
|
|
.runas_op['preds', 'pred5'](lambda x: mode(x[:5]))
|
|
|
|
.runas_op['preds', 'pred10'](lambda x: mode(x[:10]))
|
|
|
|
.with_metrics(['accuracy'])
|
|
|
|
.evaluate['gt', 'pred1']('pred1')
|
|
|
|
.evaluate['gt', 'pred5']('pred5')
|
|
|
|
.evaluate['gt', 'pred10']('pred10')
|
|
|
|
.report()
|
|
|
|
)
|
|
|
|
return benchmark
|
|
|
|
elif args.format == 'onnx':
|
|
|
|
collection_name = collection_name + '_onnx'
|
|
|
|
saved_name = model_name.replace('/', '-')
|
|
|
|
if not os.path.exists(onnx_path):
|
|
|
|
try:
|
|
|
|
op.save_model(format='onnx', path=onnx_path[:-5])
|
|
|
|
except Exception:
|
|
|
|
inputs = op.tokenizer('This is test.', return_tensors='pt')
|
|
|
|
input_names = list(inputs.keys())
|
|
|
|
dynamic_axes = {}
|
|
|
|
for i_n in input_names:
|
|
|
|
dynamic_axes[i_n] = {0: 'batch_size', 1: 'sequence_length'}
|
|
|
|
output_names = ['last_hidden_state']
|
|
|
|
for o_n in output_names:
|
|
|
|
dynamic_axes[o_n] = {0: 'batch_size', 1: 'sequence_length'}
|
|
|
|
torch.onnx.export(
|
|
|
|
op.model,
|
|
|
|
tuple(inputs.values()),
|
|
|
|
onnx_path,
|
|
|
|
input_names=input_names,
|
|
|
|
output_names=output_names,
|
|
|
|
dynamic_axes=dynamic_axes,
|
|
|
|
opset_version=14,
|
|
|
|
do_constant_folding=True,
|
|
|
|
)
|
|
|
|
sess = onnxruntime.InferenceSession(onnx_path,
|
|
|
|
providers=onnxruntime.get_available_providers())
|
|
|
|
|
|
|
|
@towhee.register
|
|
|
|
def run_onnx(txt):
|
|
|
|
inputs = op.tokenizer(txt, return_tensors='np')
|
|
|
|
try:
|
|
|
|
model_kind, model_onnx_config = FeaturesManager.check_supported_model_or_raise(
|
|
|
|
op.model, feature='default')
|
|
|
|
onnx_config = model_onnx_config(op.model.config)
|
|
|
|
new_inputs = onnx_config.generate_dummy_inputs_onnxruntime(inputs)
|
|
|
|
onnx_inputs = {}
|
|
|
|
for name, value in new_inputs.items():
|
|
|
|
if isinstance(value, (list, tuple)):
|
|
|
|
value = onnx_config.flatten_output_collection_property(name, value)
|
|
|
|
onnx_inputs.update({tensor_name: pt_tensor.numpy() for tensor_name, pt_tensor in value.items()})
|
|
|
|
else:
|
|
|
|
onnx_inputs[name] = value.numpy()
|
|
|
|
outs = sess.run(output_names=['last_hidden_state'], input_feed=dict(onnx_inputs))
|
|
|
|
except Exception:
|
|
|
|
onnx_inputs = [x.name for x in sess.get_inputs()]
|
|
|
|
new_inputs = {}
|
|
|
|
for k in onnx_inputs:
|
|
|
|
new_inputs[k] = inputs[k]
|
|
|
|
outs = sess.run(output_names=['last_hidden_state'], input_feed=dict(new_inputs))
|
|
|
|
return outs[0].squeeze(0)
|
|
|
|
|
|
|
|
def insert(model_name, collection_name):
|
|
|
|
(
|
|
|
|
towhee.dc['text', 'label'](zip(insert_data['text'], insert_data['label'])).stream()
|
|
|
|
.runas_op['text', 'text'](lambda s: s[:1024])
|
|
|
|
.run_onnx['text', 'emb']()
|
|
|
|
.runas_op['emb', 'emb'](lambda x: x[0])
|
|
|
|
.runas_op['label', 'label'](lambda y: str(y))
|
|
|
|
.ann_insert.milvus[('emb', 'label'), 'miluvs_insert'](
|
|
|
|
uri=f'tcp://{host}:{port}/{collection_name}'
|
|
|
|
)
|
|
|
|
.show(3)
|
|
|
|
)
|
|
|
|
collection = Collection(collection_name)
|
|
|
|
return collection.num_entities
|
|
|
|
|
|
|
|
def query(model_name, collection_name):
|
|
|
|
benchmark = (
|
|
|
|
towhee.dc['text', 'gt'](zip(query_data['text'], query_data['label'])).stream()
|
|
|
|
.runas_op['text', 'text'](lambda s: s[:1024])
|
|
|
|
.run_onnx['text', 'emb']()
|
|
|
|
.runas_op['emb', 'emb'](lambda x: x[0])
|
|
|
|
.runas_op['gt', 'gt'](lambda y: str(y))
|
|
|
|
.ann_search.milvus['emb', 'milvus_res'](
|
|
|
|
uri=f'tcp://{host}:{port}/{collection_name}',
|
|
|
|
metric_type=metric_type,
|
|
|
|
limit=topk,
|
|
|
|
output_fields=['label']
|
|
|
|
)
|
|
|
|
.runas_op['milvus_res', 'preds'](lambda x: [y.label for y in x]).unstream()
|
|
|
|
.runas_op['preds', 'pred1'](lambda x: mode(x[:1]))
|
|
|
|
.runas_op['preds', 'pred5'](lambda x: mode(x[:5]))
|
|
|
|
.runas_op['preds', 'pred10'](lambda x: mode(x[:10]))
|
|
|
|
.with_metrics(['accuracy'])
|
|
|
|
.evaluate['gt', 'pred1']('pred1')
|
|
|
|
.evaluate['gt', 'pred5']('pred5')
|
|
|
|
.evaluate['gt', 'pred10']('pred10')
|
|
|
|
.report()
|
|
|
|
)
|
|
|
|
return benchmark
|
|
|
|
else:
|
|
|
|
raise AttributeError('Only support "pytorch" and "onnx" as format.')
|
|
|
|
|
|
|
|
collection = create_milvus(collection_name)
|
|
|
|
insert_count = insert(model_name, collection_name)
|
|
|
|
print('Total data inserted:', insert_count)
|
|
|
|
benchmark = query(model_name, collection_name)
|