# Copyright 2021 Zilliz. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import numpy import numpy as np import torch import os from typing import List, Union from torchvision import transforms from towhee.models import frozen_in_time from towhee.operator.base import NNOperator from towhee import register from PIL import Image as PILImage from towhee.types import VideoFrame from towhee.models.utils.video_transforms import transform_video, get_configs from pathlib import Path from transformers import AutoTokenizer @register(output_schema=['vec']) class FrozenInTime(NNOperator): """ extracts features for video or text with Frozen In Time model Args: model_name (str): Frozen In Time model name to be used in FrozenInTime modality (str): Flag to decide what to return - 'video': return video embedding - 'text': return a dense of text embeddings weight_path (str): Pretrained model weights device (str): the device to run model """ def __init__(self, model_name: str = 'frozen_in_time_base_16_244', modality: str = 'video', weight_path: str = None, device: str = None): super().__init__() self.model_name = model_name self.modality = modality if weight_path is None: weight_path = os.path.join(str(Path(__file__).parent), 'frozen_in_time_base_16_224.pth') if device is None: self.device = "cuda" if torch.cuda.is_available() else "cpu" else: self.device = device self.num_frames = 4 self.model = frozen_in_time.FrozenInTime(img_size=224, patch_size=16, in_chans=3, num_frames=self.num_frames, attention_style='frozen_in_time', is_pretrained=True, weights_path=weight_path, projection_dim=256, video_pretrained_model='vit_base_16x224', video_is_load_pretrained=False, video_model_type='SpaceTimeTransformer', text_is_load_pretrained=False, device=self.device) self.model.to(self.device) self.tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased', TOKENIZERS_PARALLELISM=False) self.transform_cfgs = get_configs( side_size=256, crop_size=224, num_frames=None, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], ) self.model.eval() def __call__(self, data: Union[List[VideoFrame], List[str]]): if self.modality == 'video': vec = self._inference_from_video(data) elif self.modality == 'text': vec = self._inference_from_text(data) else: raise ValueError("modality[{}] not implemented.".format(self._modality)) return vec def _inference_from_text(self, text: List[str]): text_data = self.tokenizer(text, return_tensors='pt') # text_data = torch.tensor(text) text_data = text_data.to(self.device) text_features = self.model.compute_text(text_data) return text_features.squeeze(0).detach().flatten().cpu().numpy() def _inference_from_video(self, data: List[VideoFrame]): # Convert list of towhee.types.Image to numpy.ndarray in float32 video = numpy.stack([img.astype(numpy.float32) / 255. for img in data], axis=0) assert len(video.shape) == 4 if video.shape[0] != 4: self.transform_cfgs.update(num_frames=4) video = video.transpose(3, 0, 1, 2) # twhc -> ctwh video = transform_video( video=video, **self.transform_cfgs ) video = video.to(self.device)[None, ...].transpose(1, 2) visual_features = self.model.compute_video(video) return visual_features.squeeze(0).detach().flatten().cpu().numpy()