# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import sys from pathlib import Path import torch from torchvision import transforms from torchvision.transforms.functional import InterpolationMode from transformers import AutoProcessor, BlipForImageTextRetrieval from towhee import register from towhee.operator.base import NNOperator, OperatorFlag from towhee.types.arg import arg, to_image_color from towhee.types.image_utils import from_pil, to_pil #@accelerate class BLIPModelVision(nn.Module): def __init__(self, model): super().__init__() self.model = model def forward(self, image): image_embeds = self.model.visual_encoder(image) image_embeds = self.model.vision_proj(image_embeds[:,0,:]) return image_embeds #@accelerate class BLIPModelText(nn.Module): def __init__(self, model): super().__init__() self.model = model def forward(self, input_ids, attention_mask): text_features = self.model.text_encoder(input_ids, attention_mask = attention_mask, return_dict = False, mode = 'text')[0] text_features = self.model.text_proj(text_features[:,0,:])j return text_features @register(output_schema=['vec']) class Blip(NNOperator): """ BLIP multi-modal embedding operator """ def __init__(self, model_name: str, modality: str, device:str = 'cpu', checkpoint_path: str = None): super().__init__() self.model_name = model_name self.device = device cfg = self._configs()[model_name] model_url = cfg['weights'] image_size = cfg['image_size'] model = BlipForImageTextRetrieval.from_pretrained("Salesforce/blip-itm-base-coco") self.processor = AutoProcessor.from_pretrained("Salesforce/blip-itm-base-coco") if self.modality == 'image': self.model = BLIPModelVision(model) elif self.modality == 'text': self.model = BLIPModelText(model) else: raise ValueError("modality[{}] not implemented.".format(self.modality)) self._modality = modality self.device = "cuda" if torch.cuda.is_available() else "cpu" self.model.to(self.device) self.model.eval() #self.tfms = transforms.Compose([ # transforms.Resize((image_size,image_size),interpolation=InterpolationMode.BICUBIC), # transforms.ToTensor(), # transforms.Normalize((0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)) # ]) def __call__(self, data): if self._modality == 'image': vec = self._inference_from_image(data) elif self._modality == 'text': vec = self._inference_from_text(data) else: raise ValueError("modality[{}] not implemented.".format(self._modality)) return vec.detach().cpu().numpy().flatten() def _inference_from_text(self, text): inputs = self.processor(text=text, padding=True, return_tensors="pt") inputs = inputs.to(self.device) text_feature = self.model(input_ids = inputs. , attention_mask)[0] return text_feature @arg(1, to_image_color('RGB')) def _inference_from_image(self, img): inputs = self.processor(images=img, return_tensors="pt") inputs = inputs.to(self.device) image_feature = self.model(inputs) return image_feature def _configs(self): config = {} config['blip_base'] = {} config['blip_base']['weights'] = 'https://storage.googleapis.com/sfr-vision-language-research/BLIP/models/model_base.pth' config['blip_base']['image_size'] = 224 return config