# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys import os from pathlib import Path from urllib.parse import urlparse from collections import OrderedDict import torch from torchvision import transforms from timm.models.hub import download_cached_file from towhee import register from towhee.operator.base import NNOperator, OperatorFlag from towhee.types.arg import arg, to_image_color from towhee.types.image_utils import from_pil, to_pil def get_model(model): if isinstance(model, torch.nn.DataParallel) \ or isinstance(model, torch.nn.parallel.DistributedDataParallel): return model.module else: return model def is_url(url_or_filename): parsed = urlparse(url_or_filename) return parsed.scheme in ("http", "https") def load_checkpoint(url_or_filename, models, device): if is_url(url_or_filename): cached_file = download_cached_file(url_or_filename, check_hash=False, progress=True) checkpoint = torch.load(cached_file, map_location='cpu') elif os.path.isfile(url_or_filename): checkpoint = torch.load(url_or_filename, map_location='cpu') else: raise RuntimeError('checkpoint url or path is invalid') if is_url(url_or_filename): cached_file = download_cached_file(url_or_filename, check_hash=False, progress=True) checkpoint = torch.load(cached_file, map_location='cpu') elif os.path.isfile(url_or_filename): checkpoint = torch.load(url_or_filename, map_location='cpu') else: raise RuntimeError('checkpoint url or path is invalid') state_dict = OrderedDict() for k, v in checkpoint['state_dict'].items(): state_dict[k.replace('module.', '')] = v old_args = checkpoint['args'] model = getattr(models, old_args.model)(rand_embed=False, ssl_mlp_dim=old_args.ssl_mlp_dim, ssl_emb_dim=old_args.ssl_emb_dim) model.to(device) model.load_state_dict(state_dict, strict=True) return model @register(output_schema=['vec']) class Slip(NNOperator): """ SLIP multi-modal embedding operator """ def __init__(self, model_name: str, modality: str): super().__init__() sys.path.append(str(Path(__file__).parent)) import models from tokenizer import SimpleTokenizer self.tokenizer = SimpleTokenizer() self.device = "cuda" if torch.cuda.is_available() else "cpu" self._modality = modality self.model = load_checkpoint(self._configs()[model_name]['weights'], models, self.device) self.model.to(self.device) self.model.eval() self.tfms = transforms.Compose([ transforms.Resize(224), transforms.CenterCrop(224), lambda x: x.convert('RGB'), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) def __call__(self, data): if self._modality == 'image': vec = self._inference_from_image(data) elif self._modality == 'text': vec = self._inference_from_text(data) else: raise ValueError("modality[{}] not implemented.".format(self._modality)) vec = vec / vec.norm(dim=-1, keepdim=True) return vec.detach().cpu().numpy().flatten() def _inference_from_text(self, text): text = self.tokenizer(text).to(self.device) text = text.view(-1, 77).contiguous() embedding = get_model(self.model).encode_text(text) return embedding @arg(1, to_image_color('RGB')) def _inference_from_image(self, img): img = self._preprocess(img) img = img.to(self.device) embedding = get_model(self.model).encode_image(img) return embedding def _preprocess(self, img): img = to_pil(img) processed_img = self.tfms(img).unsqueeze(0).to(self.device) return processed_img def _configs(self): config = {} config['slip_vit_small'] = {} config['slip_vit_small']['weights'] = 'https://dl.fbaipublicfiles.com/slip/slip_small_100ep.pt' config['slip_vit_base'] = {} config['slip_vit_base']['weights'] = 'https://dl.fbaipublicfiles.com/slip/slip_base_100ep.pt' config['slip_vit_large'] = {} config['slip_vit_large']['weights'] = 'https://dl.fbaipublicfiles.com/slip/slip_large_100ep.pt' return config