|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import json
|
|
|
|
from pathlib import Path
|
|
|
|
from typing import List
|
|
|
|
import torch
|
|
|
|
import numpy
|
|
|
|
from towhee import register
|
|
|
|
from towhee.operator.base import NNOperator
|
|
|
|
from towhee.types.video_frame import VideoFrame
|
|
|
|
from towhee.models.utils.video_transforms import transform_video, get_configs
|
|
|
|
from towhee.models.video_swin_transformer import video_swin_transformer
|
|
|
|
from towhee.models.video_swin_transformer.get_configs import configs
|
|
|
|
log = logging.getLogger()
|
|
|
|
|
|
|
|
|
|
|
|
@register(output_schema=['labels', 'scores', 'features'])
|
|
|
|
class VideoSwinTransformer(NNOperator):
|
|
|
|
"""
|
|
|
|
Generate a list of class labels given a video input data.
|
|
|
|
Default labels are from [Kinetics400 Dataset](https://deepmind.com/research/open-source/kinetics).
|
|
|
|
|
|
|
|
Args:
|
|
|
|
model_name (`str`):
|
|
|
|
Supported model names:
|
|
|
|
- swin_t_k400_1k
|
|
|
|
skip_preprocess (`str`):
|
|
|
|
Flag to skip video transforms.
|
|
|
|
classmap (`str=None`):
|
|
|
|
Path of the json file to match class names.
|
|
|
|
topk (`int=5`):
|
|
|
|
The number of classification labels to be returned (ordered by possibility from high to low).
|
|
|
|
"""
|
|
|
|
def __init__(self,
|
|
|
|
model_name: str = 'swin_t_k400_1k',
|
|
|
|
framework: str = 'pytorch',
|
|
|
|
skip_preprocess: bool = False,
|
|
|
|
classmap: str = None,
|
|
|
|
topk: int = 5,
|
|
|
|
):
|
|
|
|
super().__init__(framework=framework)
|
|
|
|
self.model_name = model_name
|
|
|
|
self.skip_preprocess = skip_preprocess
|
|
|
|
self.topk = topk
|
|
|
|
self.model_configs = configs(self.model_name)
|
|
|
|
if classmap is None:
|
|
|
|
class_file = os.path.join(str(Path(__file__).parent), self.model_configs['labels_file_name'])
|
|
|
|
with open(class_file, 'r') as f:
|
|
|
|
kinetics_classes = json.load(f)
|
|
|
|
self.classmap = {}
|
|
|
|
for k, v in kinetics_classes.items():
|
|
|
|
self.classmap[v] = str(k).replace('"', '')
|
|
|
|
else:
|
|
|
|
self.classmap = classmap
|
|
|
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
|
|
|
|
|
|
|
self.model = video_swin_transformer.create_model(model_name=self.model_name,
|
|
|
|
pretrained=True,
|
|
|
|
device=self.device)
|
|
|
|
self.model.to(self.device)
|
|
|
|
self.transform_cfgs = get_configs(
|
|
|
|
side_size=224,
|
|
|
|
crop_size=224,
|
|
|
|
num_frames=32,
|
|
|
|
mean=[0.485, 0.456, 0.406],
|
|
|
|
std=[0.229, 0.224, 0.225],
|
|
|
|
)
|
|
|
|
|
|
|
|
def decoder_video(self, data: List[VideoFrame]):
|
|
|
|
video = numpy.stack([img.astype(numpy.float32) / 255. for img in data], axis=0)
|
|
|
|
assert len(video.shape) == 4
|
|
|
|
video = video.transpose(3, 0, 1, 2) # twhc -> ctwh
|
|
|
|
video = transform_video(
|
|
|
|
video=video,
|
|
|
|
**self.transform_cfgs
|
|
|
|
)
|
|
|
|
# [B x C x T x H x W]
|
|
|
|
video = video.to(self.device)[None, ...]
|
|
|
|
return video
|
|
|
|
|
|
|
|
def __call__(self, video: List[VideoFrame]):
|
|
|
|
"""
|
|
|
|
Args:
|
|
|
|
video (`List[VideoFrame]`):
|
|
|
|
Video path in string.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
(labels, scores)
|
|
|
|
A tuple of lists (labels, scores).
|
|
|
|
OR emb
|
|
|
|
Video embedding.
|
|
|
|
"""
|
|
|
|
inputs = self.decoder_video(video)
|
|
|
|
# inputs [B x C x T x H x W]
|
|
|
|
feats = self.model.forward_features(inputs)
|
|
|
|
features = feats.to('cpu').squeeze(0).detach().numpy()
|
|
|
|
|
|
|
|
outs = self.model.head(feats)
|
|
|
|
post_act = torch.nn.Softmax(dim=1)
|
|
|
|
preds = post_act(outs)
|
|
|
|
pred_scores, pred_classes = preds.topk(k=self.topk)
|
|
|
|
labels = [self.classmap[int(i)] for i in pred_classes[0]]
|
|
|
|
scores = [round(float(x), 5) for x in pred_scores[0]]
|
|
|
|
|
|
|
|
return labels, scores, features
|
|
|
|
|