# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys from pathlib import Path from PIL import Image import torch import yaml from torchvision import transforms from towhee.types.image_utils import to_pil from towhee.operator.base import NNOperator, OperatorFlag from towhee.types.arg import arg, to_image_color from towhee import register @register(output_schema=['vec']) class Albef(NNOperator): """ ALBEF multi-modal embedding operator """ def prepare_model(checkpoint_path, model): checkpoint = torch.load(checkpoint_path, map_location='cpu') state_dict = checkpoint['model'] pos_embed_reshaped = interpolate_pos_embed(state_dict['visual_encoder.pos_embed'],model.visual_encoder) state_dict['visual_encoder.pos_embed'] = pos_embed_reshaped m_pos_embed_reshaped = interpolate_pos_embed(state_dict['visual_encoder_m.pos_embed'],model.visual_encoder_m) state_dict['visual_encoder_m.pos_embed'] = m_pos_embed_reshaped for key in list(state_dict.keys()): if 'bert' in key: encoder_key = key.replace('bert.','') state_dict[encoder_key] = state_dict[key] del state_dict[key] msg = model.load_state_dict(state_dict,strict=False) print('load checkpoint from ' + checkpoint_path) return model def __init__(self, model_name: str, modality: str): self.modality = modality config = self._configs()[model_name] self.device = "cuda" if torch.cuda.is_available() else "cpu" normalize = transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)) tokenizer = BertTokenizer.from_pretrained(config) model = ALBEF(config=config, text_encoder=config['text_encoder'], tokenizer=tokenizer) cfg = yaml.load(open(config['cfg'], 'r'), Loader=yaml.Loader) checkpoint_path = cfg['ckpt_path'] self.model = self.prepare_model(checkpoint_path, model) self.test_transform = transforms.Compose([ transforms.Resize((cfg['image_res'],cfg['image_res']),interpolation=Image.BICUBIC), transforms.ToTensor(), normalize, ]) def inference_single_data(self, data): if self.modality == 'image': vec = self._inference_from_image(data) elif self.modality == 'text': vec = self._inference_from_text(data) else: raise ValueError("modality[{}] not implemented.".format(self._modality)) return vec.detach().cpu().numpy().flatten() def __call__(self, data): if not isinstance(data, list): data = [data] else: data = data results = [] for single_data in data: result = self.inference_single_data(single_data) results.append(result) if len(data) == 1: return results[0] else: return results def _inference_from_text(self, text): tokens = self.text_tokenizer(text, return_tensors='pt', padding=True)['input_ids'].to(self.device) text_features = self.text_encoder(tokens).logits return text_features @arg(1, to_image_color('RGB')) def _inference_from_image(self, img): image = to_pil(img) image = self.processor(images=image, return_tensors="pt").to(self.device) image_features = self.clip_model.get_image_features(**image) return image_features def _configs(self): config = {} config['albef_4m'] = {} config['albef_4m']['tokenizer'] = 'bert-base-uncased' config['albef_4m']['text_encoder'] = 'bert-base-uncased' config['albef_4m']['cfg_path'] = './configs/Retrieval_flickr.yaml' config['albef_4m']['ckpt_path'] = ''