# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy
import numpy as np
import torch
import os
from typing import List, Union
from torchvision import transforms
from towhee.models import frozen_in_time
from towhee.operator.base import NNOperator
from towhee import register
from PIL import Image as PILImage
from towhee.types import VideoFrame
from towhee.models.utils.video_transforms import transform_video, get_configs
from pathlib import Path
from transformers import AutoTokenizer


@register(output_schema=['vec'])
class FrozenInTime(NNOperator):
    """
    extracts features for video or text with Frozen In Time model
    Args:
        model_name (str):
            Frozen In Time model name to be used in FrozenInTime
        modality (str):
            Flag to decide what to return
                - 'video': return video embedding
                - 'text': return a dense of text embeddings
        weight_path (str):
            Pretrained model weights
        device (str):
            the device to run model
    """

    def __init__(self, model_name: str = 'frozen_in_time_base_16_244', modality: str = 'video',
                 weight_path: str = None,
                 device: str = None):
        super().__init__()
        self.model_name = model_name
        self.modality = modality
        if weight_path is None:
            weight_path = os.path.join(str(Path(__file__).parent), 'frozen_in_time_base_16_224.pth')
        if device is None:
            self.device = "cuda" if torch.cuda.is_available() else "cpu"
        else:
            self.device = device
        self.num_frames = 4
        self.model = frozen_in_time.FrozenInTime(img_size=224,
                                                 patch_size=16,
                                                 in_chans=3,
                                                 num_frames=self.num_frames,
                                                 attention_style='frozen_in_time',
                                                 is_pretrained=True,
                                                 weights_path=weight_path,
                                                 projection_dim=256,
                                                 video_pretrained_model='vit_base_16x224',
                                                 video_is_load_pretrained=False,
                                                 video_model_type='SpaceTimeTransformer',
                                                 text_is_load_pretrained=False,
                                                 device=self.device)
        self.model.to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased', TOKENIZERS_PARALLELISM=False)
        self.transform_cfgs = get_configs(
            side_size=256,
            crop_size=224,
            num_frames=None,
            mean=[0.485, 0.456, 0.406],
            std=[0.229, 0.224, 0.225],
        )
        self.model.eval()

    def __call__(self, data: Union[List[VideoFrame], List[str]]):
        if self.modality == 'video':
            vec = self._inference_from_video(data)
        elif self.modality == 'text':
            vec = self._inference_from_text(data)
        else:
            raise ValueError("modality[{}] not implemented.".format(self._modality))
        return vec

    def _inference_from_text(self, text: List[str]):
        text_data = self.tokenizer(text, return_tensors='pt')
        # text_data = torch.tensor(text)
        text_data = text_data.to(self.device)
        text_features = self.model.compute_text(text_data)
        return text_features.squeeze(0).detach().flatten().cpu().numpy()

    def _inference_from_video(self, data: List[VideoFrame]):
        # Convert list of towhee.types.Image to numpy.ndarray in float32
        video = numpy.stack([img.astype(numpy.float32) / 255. for img in data], axis=0)
        assert len(video.shape) == 4
        if video.shape[0] != 4:
            self.transform_cfgs.update(num_frames=4)
        video = video.transpose(3, 0, 1, 2)  # twhc -> ctwh
        video = transform_video(
            video=video,
            **self.transform_cfgs
        )
        video = video.to(self.device)[None, ...].transpose(1, 2)
        visual_features = self.model.compute_video(video)
        return visual_features.squeeze(0).detach().flatten().cpu().numpy()