drl
              
                 
                
            
          copied
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
			
			Readme
Files and versions
		
			
				
					94 lines
				
				3.7 KiB
			
		
		
			
		
	
	
					94 lines
				
				3.7 KiB
			| 
											3 years ago
										 | # Copyright 2021 Zilliz. All rights reserved. | ||
|  | # | ||
|  | # Licensed under the Apache License, Version 2.0 (the "License"); | ||
|  | # you may not use this file except in compliance with the License. | ||
|  | # You may obtain a copy of the License at | ||
|  | # | ||
|  | #     http://www.apache.org/licenses/LICENSE-2.0 | ||
|  | # | ||
|  | # Unless required by applicable law or agreed to in writing, software | ||
|  | # distributed under the License is distributed on an "AS IS" BASIS, | ||
|  | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
|  | # See the License for the specific language governing permissions and | ||
|  | # limitations under the License. | ||
|  | 
 | ||
|  | import numpy as np | ||
|  | import torch | ||
|  | 
 | ||
|  | from typing import List, Union | ||
|  | from torchvision import transforms | ||
|  | from towhee.models.clip4clip import convert_tokens_to_id | ||
|  | from towhee.operator.base import NNOperator | ||
|  | from towhee import register | ||
|  | from towhee.models import drl, clip4clip | ||
|  | from PIL import Image as PILImage | ||
|  | from towhee.types import VideoFrame | ||
|  | from pathlib import Path | ||
|  | 
 | ||
|  | 
 | ||
|  | @register(output_schema=['vec']) | ||
|  | class DRL(NNOperator): | ||
|  |     """
 | ||
|  |     DRL multi-modal embedding operator | ||
|  |     """
 | ||
|  | 
 | ||
|  |     def __init__(self, base_encoder: str, modality: str, weight_path: str = None, device: str = None): | ||
|  |         super().__init__() | ||
|  |         self.modality = modality | ||
|  |         if weight_path is None: | ||
|  |             weight_path = str(Path(__file__).parent / 'clip_vit_b32_wti.pth') | ||
|  |         if device is None: | ||
|  |             self.device = "cuda" if torch.cuda.is_available() else "cpu" | ||
|  |         else: | ||
|  |             self.device = device | ||
|  |         self.model = drl.create_model(base_encoder=base_encoder, pretrained=True, cdcr=0, weights_path=weight_path) | ||
|  | 
 | ||
|  |         self.tokenize = clip4clip.SimpleTokenizer() | ||
|  |         self.tfms = transforms.Compose([ | ||
|  |             transforms.Resize(224, interpolation=transforms.InterpolationMode.BICUBIC), | ||
|  |             transforms.CenterCrop(224), | ||
|  |             transforms.ToTensor(), | ||
|  |             transforms.Normalize( | ||
|  |                 (0.48145466, 0.4578275, 0.40821073), (0.26862954, 0.26130258, 0.27577711)) | ||
|  |         ]) | ||
|  |         self.model.eval() | ||
|  | 
 | ||
|  |     def __call__(self, data: Union[str, List[VideoFrame]]): | ||
|  |         if self.modality == 'video': | ||
|  |             vec = self._inference_from_video(data) | ||
|  |         elif self.modality == 'text': | ||
|  |             vec = self._inference_from_text(data) | ||
|  |         else: | ||
|  |             raise ValueError("modality[{}] not implemented.".format(self._modality)) | ||
|  |         return vec | ||
|  | 
 | ||
|  |     def _inference_from_text(self, text: str): | ||
|  |         self.model.eval() | ||
|  |         text_ids = convert_tokens_to_id(self.tokenize, text) | ||
|  |         text_ids = torch.tensor(text_ids).unsqueeze(0).to(self.device) | ||
|  |         text_features = self.model.get_text_feat(text_ids)  # B(1), N_t, D | ||
|  |         return text_features.squeeze(0).detach().cpu().numpy()  # N_t, D | ||
|  | 
 | ||
|  |     def _inference_from_video(self, img_list: List[VideoFrame]): | ||
|  |         self.model.eval() | ||
|  |         max_frames = 12 | ||
|  |         video = np.zeros((1, max_frames, 1, 3, 224, 224), dtype=np.float64) | ||
|  |         slice_len = len(img_list) | ||
|  |         max_video_length = 0 if 0 > slice_len else slice_len | ||
|  |         for i, img in enumerate(img_list): | ||
|  |             pil_img = PILImage.fromarray(img, img.mode) | ||
|  |             tfmed_img = self.tfms(pil_img).unsqueeze(0) | ||
|  |             if slice_len >= 1: | ||
|  |                 video[0, i, ...] = tfmed_img.cpu().numpy() | ||
|  |         video_mask = np.zeros((1, max_frames), dtype=np.int32) | ||
|  |         video_mask[0, :max_video_length] = [1] * max_video_length | ||
|  | 
 | ||
|  |         video = torch.as_tensor(video).float().to(self.device) | ||
|  |         pair, bs, ts, channel, h, w = video.shape | ||
|  |         video = video.view(pair * bs * ts, channel, h, w) | ||
|  |         video_mask = torch.as_tensor(video_mask).float().to(self.device) | ||
|  | 
 | ||
|  |         visual_output = self.model.get_video_feat(video, video_mask, shaped=True)  # B(1), N_v, D | ||
|  | 
 | ||
|  |         return visual_output.squeeze(0).detach().cpu().numpy()  # N_v, D | 
