frozen-in-time
copied
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Readme
Files and versions
114 lines
4.8 KiB
114 lines
4.8 KiB
# Copyright 2021 Zilliz. All rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import numpy
|
|
import numpy as np
|
|
import torch
|
|
import os
|
|
from typing import List, Union
|
|
from torchvision import transforms
|
|
from towhee.models import frozen_in_time
|
|
from towhee.operator.base import NNOperator
|
|
from towhee import register
|
|
from PIL import Image as PILImage
|
|
from towhee.types import VideoFrame
|
|
from towhee.models.utils.video_transforms import transform_video, get_configs
|
|
from pathlib import Path
|
|
from transformers import AutoTokenizer
|
|
|
|
|
|
@register(output_schema=['vec'])
|
|
class FrozenInTime(NNOperator):
|
|
"""
|
|
extracts features for video or text with Frozen In Time model
|
|
Args:
|
|
model_name (str):
|
|
Frozen In Time model name to be used in FrozenInTime
|
|
modality (str):
|
|
Flag to decide what to return
|
|
- 'video': return video embedding
|
|
- 'text': return a dense of text embeddings
|
|
weight_path (str):
|
|
Pretrained model weights
|
|
device (str):
|
|
the device to run model
|
|
"""
|
|
|
|
def __init__(self, model_name: str = 'frozen_in_time_base_16_244', modality: str = 'video',
|
|
weight_path: str = None,
|
|
device: str = None):
|
|
super().__init__()
|
|
self.model_name = model_name
|
|
self.modality = modality
|
|
if weight_path is None:
|
|
weight_path = os.path.join(str(Path(__file__).parent), 'frozen_in_time_base_16_224.pth')
|
|
if device is None:
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
else:
|
|
self.device = device
|
|
self.num_frames = 4
|
|
self.model = frozen_in_time.FrozenInTime(img_size=224,
|
|
patch_size=16,
|
|
in_chans=3,
|
|
num_frames=self.num_frames,
|
|
attention_style='frozen_in_time',
|
|
is_pretrained=True,
|
|
weights_path=weight_path,
|
|
projection_dim=256,
|
|
video_pretrained_model='vit_base_16x224',
|
|
video_is_load_pretrained=False,
|
|
video_model_type='SpaceTimeTransformer',
|
|
text_is_load_pretrained=False,
|
|
device=self.device)
|
|
self.model.to(self.device)
|
|
self.tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased', TOKENIZERS_PARALLELISM=False)
|
|
self.transform_cfgs = get_configs(
|
|
side_size=256,
|
|
crop_size=224,
|
|
num_frames=None,
|
|
mean=[0.485, 0.456, 0.406],
|
|
std=[0.229, 0.224, 0.225],
|
|
)
|
|
self.model.eval()
|
|
|
|
def __call__(self, data: Union[List[VideoFrame], List[str]]):
|
|
if self.modality == 'video':
|
|
vec = self._inference_from_video(data)
|
|
elif self.modality == 'text':
|
|
vec = self._inference_from_text(data)
|
|
else:
|
|
raise ValueError("modality[{}] not implemented.".format(self._modality))
|
|
return vec
|
|
|
|
def _inference_from_text(self, text: List[str]):
|
|
text_data = self.tokenizer(text, return_tensors='pt')
|
|
# text_data = torch.tensor(text)
|
|
text_data = text_data.to(self.device)
|
|
text_features = self.model.compute_text(text_data)
|
|
return text_features.squeeze(0).detach().flatten().cpu().numpy()
|
|
|
|
def _inference_from_video(self, data: List[VideoFrame]):
|
|
# Convert list of towhee.types.Image to numpy.ndarray in float32
|
|
video = numpy.stack([img.astype(numpy.float32) / 255. for img in data], axis=0)
|
|
assert len(video.shape) == 4
|
|
if video.shape[0] != 4:
|
|
self.transform_cfgs.update(num_frames=4)
|
|
video = video.transpose(3, 0, 1, 2) # twhc -> ctwh
|
|
video = transform_video(
|
|
video=video,
|
|
**self.transform_cfgs
|
|
)
|
|
video = video.to(self.device)[None, ...].transpose(1, 2)
|
|
visual_features = self.model.compute_video(video)
|
|
return visual_features.squeeze(0).detach().flatten().cpu().numpy()
|
|
|