|
|
|
|
|
|
|
from typing import Generator, NamedTuple
|
|
|
|
from functools import partial, reduce
|
|
|
|
|
|
|
|
import math
|
|
|
|
import logging
|
|
|
|
|
|
|
|
import av
|
|
|
|
import numpy as np
|
|
|
|
|
|
|
|
from towhee.types.video_frame import VideoFrame
|
|
|
|
from towhee.operator.base import PyOperator
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger()
|
|
|
|
|
|
|
|
|
|
|
|
class SAMPLE_TYPE:
|
|
|
|
UNIFORM_TEMPORAL_SUBSAMPLE = 'uniform_temporal_subsample'
|
|
|
|
TIME_STEP_SAMPLE = 'time_step_sample'
|
|
|
|
|
|
|
|
|
|
|
|
class VideoDecoder(PyOperator):
|
|
|
|
'''
|
|
|
|
VideoDecoder
|
|
|
|
Return images with RGB format.
|
|
|
|
'''
|
|
|
|
|
|
|
|
def __init__(self, start_time=None, end_time=None, sample_type=None, args=None) -> None:
|
|
|
|
super().__init__()
|
|
|
|
self._start_time = start_time if start_time is not None else 0
|
|
|
|
self._end_time = end_time
|
|
|
|
self._sample_type = sample_type
|
|
|
|
self._args = args if args is not None else {}
|
|
|
|
|
|
|
|
def get_sample(self, stream, duration):
|
|
|
|
if self._sample_type is None:
|
|
|
|
return self._no_sample
|
|
|
|
elif self._sample_type.lower() == SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE:
|
|
|
|
end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration
|
|
|
|
start_time = self._start_time if self._start_time is not None else 0
|
|
|
|
nums = int(stream.average_rate * (end_time - start_time))
|
|
|
|
return partial(self._uniform_temporal_subsample, total_frames=nums)
|
|
|
|
elif self._sample_type.lower() == SAMPLE_TYPE.TIME_STEP_SAMPLE:
|
|
|
|
start_time = self._start_time if self._start_time is not None else 0
|
|
|
|
end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration
|
|
|
|
return partial(self._time_step_sample, start_time=start_time, end_time=end_time)
|
|
|
|
else:
|
|
|
|
raise RuntimeError('Unkown sample type: %s' % self._sample_type)
|
|
|
|
|
|
|
|
def _no_sample(self, frame_iter):
|
|
|
|
if self._end_time is None:
|
|
|
|
yield from frame_iter
|
|
|
|
else:
|
|
|
|
for frame in frame_iter:
|
|
|
|
frame.time < self._end_time
|
|
|
|
yield frame
|
|
|
|
|
|
|
|
def _time_step_sample(self, frame_iter, start_time, end_time):
|
|
|
|
time_step = self._args.get('time_step')
|
|
|
|
if time_step is None:
|
|
|
|
raise RuntimeError('time_step_sample sample lost args time_step')
|
|
|
|
|
|
|
|
time_index = start_time
|
|
|
|
for frame in frame_iter:
|
|
|
|
if time_index >= end_time:
|
|
|
|
break
|
|
|
|
|
|
|
|
if frame.time >= time_index:
|
|
|
|
time_index += time_step
|
|
|
|
yield frame
|
|
|
|
|
|
|
|
def _uniform_temporal_subsample(self, frame_iter, total_frames):
|
|
|
|
num_samples = self._args.get('num_samples')
|
|
|
|
if num_samples is None:
|
|
|
|
raise RuntimeError('uniform_temporal_subsample lost args num_samples')
|
|
|
|
|
|
|
|
indexs = np.linspace(0, total_frames - 1, num_samples).astype('int')
|
|
|
|
cur_index = 0
|
|
|
|
count = 0
|
|
|
|
for frame in frame_iter:
|
|
|
|
if cur_index >= len(indexs):
|
|
|
|
return
|
|
|
|
|
|
|
|
while cur_index < len(indexs) and indexs[cur_index] <= count:
|
|
|
|
cur_index += 1
|
|
|
|
yield frame
|
|
|
|
count += 1
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _decdoe(video, container, start_time):
|
|
|
|
if start_time is not None:
|
|
|
|
start_offset = int(math.floor(start_time * (1 / video.time_base)))
|
|
|
|
else:
|
|
|
|
start_offset = 0
|
|
|
|
seek_offset = start_offset
|
|
|
|
seek_offset = max(seek_offset - 1, 0)
|
|
|
|
try:
|
|
|
|
container.seek(seek_offset, any_frame=False, backward=True, stream=video)
|
|
|
|
except av.AVError as e:
|
|
|
|
logger.error('Seek to start_time: %s sec failed, the offset is %s, errors: %s' % (start_time, seek_offset, str(e)))
|
|
|
|
raise RuntimeError from e
|
|
|
|
|
|
|
|
for frame in container.decode(video):
|
|
|
|
if frame.time < start_time:
|
|
|
|
continue
|
|
|
|
yield frame
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_video_duration(video):
|
|
|
|
if video.duration is not None:
|
|
|
|
return float(video.duration * video.time_base)
|
|
|
|
elif video.metadata.get('DURATION') is not None:
|
|
|
|
time_str = video.metadata['DURATION']
|
|
|
|
return reduce(lambda x, y: float(x) * 60 + float(y), time_str.split(':'))
|
|
|
|
else:
|
|
|
|
return None
|
|
|
|
|
|
|
|
def __call__(self, video_path: str) -> Generator:
|
|
|
|
with av.open(video_path) as container:
|
|
|
|
stream = container.streams.video[0]
|
|
|
|
duration = VideoDecoder.get_video_duration(stream)
|
|
|
|
if duration is None:
|
|
|
|
duration = float(container.duration) / 1000000
|
|
|
|
|
|
|
|
image_format = 'RGB'
|
|
|
|
frame_gen = VideoDecoder._decdoe(stream, container, self._start_time)
|
|
|
|
sample_function = self.get_sample(stream, duration)
|
|
|
|
for frame in sample_function(frame_gen):
|
|
|
|
timestamp = int(frame.time * 1000)
|
|
|
|
ndarray = frame.to_ndarray(format='rgb24')
|
|
|
|
img = VideoFrame(ndarray, image_format, timestamp, frame.key_frame)
|
|
|
|
yield img
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if __name__ == '__main__':
|
|
|
|
# video_path = "/home/junjie.jiangjjj/workspace/video/[The Rock] [1996] [Trailer] [#2]-16-l-rO5B64.mkv"
|
|
|
|
# video_path1 = "/home/junjie.jiangjjj/workspace/video/'Eagle Eye' Trailer (2008)-_wkqo_Rd3_Q.mp4"
|
|
|
|
# video_path2 = "/home/junjie.jiangjjj/workspace/video/2001 - A Space Odyssey - Trailer [1968] HD-Z2UWOeBcsJI.webm"
|
|
|
|
# # video_path3 = "/home/zhangchen/zhangchen_workspace/dataset/MSRVTT/msrvtt_data/MSRVTT_Videos/video9991.mp4"
|
|
|
|
# video_path3 = "/home/junjie.jiangjjj/e2adc784b83446ae775f698b9d17c9fd392b2f75.flv"
|
|
|
|
|
|
|
|
# def d(video_path):
|
|
|
|
# d = VideoDecoder(10, 17, 'time_step_sample', {'time_step': 1})
|
|
|
|
# fs = d(video_path)
|
|
|
|
# for f in fs:
|
|
|
|
# print(f.mode, f.key_frame, f.timestamp)
|
|
|
|
|
|
|
|
# d(video_path)
|
|
|
|
# # print('#' * 100)
|
|
|
|
|
|
|
|
# # with av.open(video_path) as container:
|
|
|
|
# # print(container.duration)
|
|
|
|
# # stream = container.streams.video[0]
|
|
|
|
# # print(stream.time_base)
|
|
|
|
|
|
|
|
|