diff --git a/cpu_decode.py b/cpu_decode.py new file mode 100644 index 0000000..094b8e6 --- /dev/null +++ b/cpu_decode.py @@ -0,0 +1,50 @@ +import math +import logging +import av + +from towhee.types.video_frame import VideoFrame + + +logger = logging.getLogger() + + +class PyAVDecode: + def __init__(self, video_path, start_time=None, time_step=None) -> None: + self._container = av.open(video_path) + self._stream = self._container.streams.video[0] + self._start_time = start_time if start_time is not None else 0 + self._time_step = time_step + + def close(self): + self._container.close() + + def time_step_decode(self): + ts = self._start_time + is_end = False + while not is_end: + is_end = True + offset = int(math.floor(ts / self._stream.time_base)) + self._container.seek(offset, stream=self._stream) + for f in self._container.decode(self._stream): + if f.time < ts: + continue + yield self.av_frame_to_video_frame(f) + is_end = False + break + ts += self._time_step + + def av_frame_to_video_frame(self, frame): + timestamp = int(round(frame.time * 1000)) + ndarray = frame.to_ndarray(format='rgb24') + img = VideoFrame(ndarray, 'RGB', timestamp, frame.key_frame) + return img + + def decode(self): + if self._start_time > 0: + offset = int(math.floor(self._start_time / self._stream.time_base)) + self._container.seek(offset, any_frame=False, backward=True, stream=self._stream) + + for frame in self._container.decode(self._stream): + if frame.time < self._start_time: + continue + yield self.av_frame_to_video_frame(frame) diff --git a/video_decoder.py b/video_decoder.py index ae37526..bab47ad 100644 --- a/video_decoder.py +++ b/video_decoder.py @@ -1,16 +1,9 @@ - -from typing import Generator, NamedTuple -from functools import partial, reduce - -import math import logging - import av import numpy as np - -from towhee.types.video_frame import VideoFrame from towhee.operator.base import PyOperator +from cpu_decode import PyAVDecode logger = logging.getLogger() @@ -29,56 +22,22 @@ class VideoDecoder(PyOperator): def __init__(self, start_time=None, end_time=None, sample_type=None, args=None) -> None: super().__init__() self._start_time = start_time if start_time is not None else 0 - self._end_time = end_time - self._sample_type = sample_type + self._end_time = end_time if end_time is not None else None + self._end_time_ms = end_time * 1000 if end_time is not None else None + self._sample_type = sample_type.lower() if sample_type else None self._args = args if args is not None else {} - def get_sample(self, stream, duration): - if self._sample_type is None: - return self._no_sample - elif self._sample_type.lower() == SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE: - end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration - start_time = self._start_time if self._start_time is not None else 0 - nums = int(stream.average_rate * (end_time - start_time)) - return partial(self._uniform_temporal_subsample, total_frames=nums) - elif self._sample_type.lower() == SAMPLE_TYPE.TIME_STEP_SAMPLE: - start_time = self._start_time if self._start_time is not None else 0 - end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration - return partial(self._time_step_sample, start_time=start_time, end_time=end_time) - else: - raise RuntimeError('Unkown sample type: %s' % self._sample_type) + def decode(self, video_path: str): + yield from PyAVDecode(video_path, self._start_time).decode() - def _no_sample(self, frame_iter): - if self._end_time is None: - yield from frame_iter - else: - for frame in frame_iter: - frame.time < self._end_time - yield frame - - def _time_step_sample(self, frame_iter, start_time, end_time): - time_step = self._args.get('time_step') - if time_step is None: - raise RuntimeError('time_step_sample sample lost args time_step') - - time_index = start_time - for frame in frame_iter: - if time_index >= end_time: - break - - if frame.time >= time_index: - time_index += time_step - yield frame + def time_step_decode(self, video_path, time_step): + yield from PyAVDecode(video_path, self._start_time, time_step).time_step_decode() - def _uniform_temporal_subsample(self, frame_iter, total_frames): - num_samples = self._args.get('num_samples') - if num_samples is None: - raise RuntimeError('uniform_temporal_subsample lost args num_samples') - + def _uniform_temporal_subsample(self, frames, num_samples, total_frames): indexs = np.linspace(0, total_frames - 1, num_samples).astype('int') cur_index = 0 count = 0 - for frame in frame_iter: + for frame in frames: if cur_index >= len(indexs): return @@ -86,73 +45,33 @@ class VideoDecoder(PyOperator): cur_index += 1 yield frame count += 1 - - @staticmethod - def _decdoe(video, container, start_time): - if start_time is not None: - start_offset = int(math.floor(start_time * (1 / video.time_base))) - else: - start_offset = 0 - seek_offset = start_offset - seek_offset = max(seek_offset - 1, 0) - try: - container.seek(seek_offset, any_frame=False, backward=True, stream=video) - except av.AVError as e: - logger.error('Seek to start_time: %s sec failed, the offset is %s, errors: %s' % (start_time, seek_offset, str(e))) - raise RuntimeError from e - for frame in container.decode(video): - if frame.time < start_time: - continue - yield frame - - @staticmethod - def get_video_duration(video): - if video.duration is not None: - return float(video.duration * video.time_base) - elif video.metadata.get('DURATION') is not None: - time_str = video.metadata['DURATION'] - return reduce(lambda x, y: float(x) * 60 + float(y), time_str.split(':')) - else: - return None - - def __call__(self, video_path: str) -> Generator: - with av.open(video_path) as container: - stream = container.streams.video[0] - duration = VideoDecoder.get_video_duration(stream) - if duration is None: - duration = float(container.duration) / 1000000 - - image_format = 'RGB' - frame_gen = VideoDecoder._decdoe(stream, container, self._start_time) - sample_function = self.get_sample(stream, duration) - for frame in sample_function(frame_gen): - timestamp = int(frame.time * 1000) - ndarray = frame.to_ndarray(format='rgb24') - img = VideoFrame(ndarray, image_format, timestamp, frame.key_frame) - yield img - - - -# if __name__ == '__main__': -# video_path = "/home/junjie.jiangjjj/workspace/video/[The Rock] [1996] [Trailer] [#2]-16-l-rO5B64.mkv" -# video_path1 = "/home/junjie.jiangjjj/workspace/video/'Eagle Eye' Trailer (2008)-_wkqo_Rd3_Q.mp4" -# video_path2 = "/home/junjie.jiangjjj/workspace/video/2001 - A Space Odyssey - Trailer [1968] HD-Z2UWOeBcsJI.webm" -# # video_path3 = "/home/zhangchen/zhangchen_workspace/dataset/MSRVTT/msrvtt_data/MSRVTT_Videos/video9991.mp4" -# video_path3 = "/home/junjie.jiangjjj/e2adc784b83446ae775f698b9d17c9fd392b2f75.flv" - -# def d(video_path): -# d = VideoDecoder(10, 17, 'time_step_sample', {'time_step': 1}) -# fs = d(video_path) -# for f in fs: -# print(f.mode, f.key_frame, f.timestamp) - -# d(video_path) -# # print('#' * 100) - -# # with av.open(video_path) as container: -# # print(container.duration) -# # stream = container.streams.video[0] -# # print(stream.time_base) + def _filter(self, frames): + for f in frames: + if self._end_time_ms and f.timestamp > self._end_time_ms: + break + yield f + def frame_nums(self, video_path): + with av.open(video_path) as c: + video = c.streams.video[0] + start = self._start_time if self._start_time is not None else 0 + duration = c.duration / 1000000 + end = self._end_time if self._end_time and self._end_time <= duration else duration + return int(round((end - start) * video.average_rate)) + def __call__(self, video_path: str): + if self._sample_type is None: + yield from self._filter(self.decode(video_path)) + elif self._sample_type == SAMPLE_TYPE.TIME_STEP_SAMPLE: + time_step = self._args.get('time_step') + if time_step is None: + raise RuntimeError('time_step_sample sample lost args time_step') + yield from self._filter(self.time_step_decode(video_path, time_step)) + elif self._sample_type == SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE: + num_samples = self._args.get('num_samples') + if num_samples is None: + raise RuntimeError('uniform_temporal_subsample lost args num_samples') + yield from self._uniform_temporal_subsample(self.decode(video_path), num_samples, self.frame_nums(video_path)) + else: + raise RuntimeError('Unkown sample type, only supports: [%s|%s]' % (SAMPLE_TYPE.TIME_STEP_SAMPLE, SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE)) diff --git a/video_decoder.yaml b/video_decoder.yaml deleted file mode 100644 index 7223e0a..0000000 --- a/video_decoder.yaml +++ /dev/null @@ -1,10 +0,0 @@ -name: 'video-decoder' -labels: -operator: 'towhee/video-decoder' -init: - key_frame: bool -call: - input: - video_path: str - output: - image: Image