|  | @ -1,16 +1,9 @@ | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | from typing import Generator, NamedTuple |  |  |  | 
		
	
		
			
				|  |  | from functools import partial, reduce |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | import math |  |  |  | 
		
	
		
			
				|  |  | import logging |  |  | import logging | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | import av |  |  | import av | 
		
	
		
			
				|  |  | import numpy as np |  |  | import numpy as np | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | from towhee.types.video_frame import VideoFrame |  |  |  | 
		
	
		
			
				|  |  | from towhee.operator.base import PyOperator |  |  | from towhee.operator.base import PyOperator | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |  |  |  | from cpu_decode import PyAVDecode | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  | logger = logging.getLogger() |  |  | logger = logging.getLogger() | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
	
		
			
				|  | @ -29,56 +22,22 @@ class VideoDecoder(PyOperator): | 
		
	
		
			
				|  |  |     def __init__(self, start_time=None, end_time=None, sample_type=None, args=None) -> None: |  |  |     def __init__(self, start_time=None, end_time=None, sample_type=None, args=None) -> None: | 
		
	
		
			
				|  |  |         super().__init__() |  |  |         super().__init__() | 
		
	
		
			
				|  |  |         self._start_time = start_time if start_time is not None else 0 |  |  |         self._start_time = start_time if start_time is not None else 0 | 
		
	
		
			
				|  |  |         self._end_time = end_time |  |  |  | 
		
	
		
			
				|  |  |         self._sample_type = sample_type |  |  |  | 
		
	
		
			
				|  |  |  |  |  |         self._end_time = end_time if end_time is not None else None | 
		
	
		
			
				|  |  |  |  |  |         self._end_time_ms = end_time * 1000 if end_time is not None else None | 
		
	
		
			
				|  |  |  |  |  |         self._sample_type = sample_type.lower() if sample_type else None | 
		
	
		
			
				|  |  |         self._args = args if args is not None else {} |  |  |         self._args = args if args is not None else {} | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |     def get_sample(self, stream, duration): |  |  |  | 
		
	
		
			
				|  |  |         if self._sample_type is None: |  |  |  | 
		
	
		
			
				|  |  |             return self._no_sample |  |  |  | 
		
	
		
			
				|  |  |         elif self._sample_type.lower() == SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE: |  |  |  | 
		
	
		
			
				|  |  |             end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration |  |  |  | 
		
	
		
			
				|  |  |             start_time = self._start_time if self._start_time is not None else 0 |  |  |  | 
		
	
		
			
				|  |  |             nums = int(stream.average_rate * (end_time - start_time)) |  |  |  | 
		
	
		
			
				|  |  |             return partial(self._uniform_temporal_subsample, total_frames=nums) |  |  |  | 
		
	
		
			
				|  |  |         elif self._sample_type.lower() == SAMPLE_TYPE.TIME_STEP_SAMPLE: |  |  |  | 
		
	
		
			
				|  |  |             start_time = self._start_time if self._start_time is not None else 0 |  |  |  | 
		
	
		
			
				|  |  |             end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration |  |  |  | 
		
	
		
			
				|  |  |             return partial(self._time_step_sample, start_time=start_time, end_time=end_time) |  |  |  | 
		
	
		
			
				|  |  |         else: |  |  |  | 
		
	
		
			
				|  |  |             raise RuntimeError('Unkown sample type: %s' % self._sample_type) |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |     def _no_sample(self, frame_iter): |  |  |  | 
		
	
		
			
				|  |  |         if self._end_time is None: |  |  |  | 
		
	
		
			
				|  |  |             yield from frame_iter |  |  |  | 
		
	
		
			
				|  |  |         else: |  |  |  | 
		
	
		
			
				|  |  |             for frame in frame_iter: |  |  |  | 
		
	
		
			
				|  |  |                 frame.time < self._end_time |  |  |  | 
		
	
		
			
				|  |  |                 yield frame |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |     def _time_step_sample(self, frame_iter, start_time, end_time): |  |  |  | 
		
	
		
			
				|  |  |         time_step = self._args.get('time_step') |  |  |  | 
		
	
		
			
				|  |  |         if time_step is None: |  |  |  | 
		
	
		
			
				|  |  |             raise RuntimeError('time_step_sample sample lost args time_step') |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |         time_index = start_time |  |  |  | 
		
	
		
			
				|  |  |         for frame in frame_iter: |  |  |  | 
		
	
		
			
				|  |  |             if time_index >= end_time: |  |  |  | 
		
	
		
			
				|  |  |                 break |  |  |  | 
		
	
		
			
				|  |  |  |  |  |     def decode(self, video_path: str): | 
		
	
		
			
				|  |  |  |  |  |         yield from PyAVDecode(video_path, self._start_time).decode() | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |             if frame.time >= time_index: |  |  |  | 
		
	
		
			
				|  |  |                 time_index += time_step |  |  |  | 
		
	
		
			
				|  |  |                 yield frame |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |     def _uniform_temporal_subsample(self, frame_iter, total_frames): |  |  |  | 
		
	
		
			
				|  |  |         num_samples = self._args.get('num_samples') |  |  |  | 
		
	
		
			
				|  |  |         if num_samples is None: |  |  |  | 
		
	
		
			
				|  |  |             raise RuntimeError('uniform_temporal_subsample lost args num_samples') |  |  |  | 
		
	
		
			
				|  |  |  |  |  |     def time_step_decode(self, video_path, time_step): | 
		
	
		
			
				|  |  |  |  |  |         yield from PyAVDecode(video_path, self._start_time, time_step).time_step_decode() | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |  |  |  |     def _uniform_temporal_subsample(self, frames, num_samples, total_frames): | 
		
	
		
			
				|  |  |         indexs = np.linspace(0, total_frames - 1, num_samples).astype('int') |  |  |         indexs = np.linspace(0, total_frames - 1, num_samples).astype('int') | 
		
	
		
			
				|  |  |         cur_index = 0 |  |  |         cur_index = 0 | 
		
	
		
			
				|  |  |         count = 0 |  |  |         count = 0 | 
		
	
		
			
				|  |  |         for frame in frame_iter: |  |  |  | 
		
	
		
			
				|  |  |  |  |  |         for frame in frames: | 
		
	
		
			
				|  |  |             if cur_index >= len(indexs): |  |  |             if cur_index >= len(indexs): | 
		
	
		
			
				|  |  |                 return |  |  |                 return | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
	
		
			
				|  | @ -87,72 +46,32 @@ class VideoDecoder(PyOperator): | 
		
	
		
			
				|  |  |                 yield frame |  |  |                 yield frame | 
		
	
		
			
				|  |  |             count += 1 |  |  |             count += 1 | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |     @staticmethod |  |  |  | 
		
	
		
			
				|  |  |     def _decdoe(video, container, start_time): |  |  |  | 
		
	
		
			
				|  |  |         if start_time is not None: |  |  |  | 
		
	
		
			
				|  |  |             start_offset = int(math.floor(start_time * (1 / video.time_base))) |  |  |  | 
		
	
		
			
				|  |  |         else: |  |  |  | 
		
	
		
			
				|  |  |             start_offset = 0 |  |  |  | 
		
	
		
			
				|  |  |         seek_offset = start_offset |  |  |  | 
		
	
		
			
				|  |  |         seek_offset = max(seek_offset - 1, 0) |  |  |  | 
		
	
		
			
				|  |  |         try: |  |  |  | 
		
	
		
			
				|  |  |             container.seek(seek_offset, any_frame=False, backward=True, stream=video) |  |  |  | 
		
	
		
			
				|  |  |         except av.AVError as e: |  |  |  | 
		
	
		
			
				|  |  |             logger.error('Seek to start_time: %s sec failed, the offset is %s, errors: %s' % (start_time, seek_offset, str(e))) |  |  |  | 
		
	
		
			
				|  |  |             raise RuntimeError from e |  |  |  | 
		
	
		
			
				|  |  |  |  |  |     def _filter(self, frames): | 
		
	
		
			
				|  |  |  |  |  |         for f in frames: | 
		
	
		
			
				|  |  |  |  |  |             if self._end_time_ms and f.timestamp > self._end_time_ms: | 
		
	
		
			
				|  |  |  |  |  |                 break | 
		
	
		
			
				|  |  |  |  |  |             yield f | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |         for frame in container.decode(video): |  |  |  | 
		
	
		
			
				|  |  |             if frame.time < start_time: |  |  |  | 
		
	
		
			
				|  |  |                 continue |  |  |  | 
		
	
		
			
				|  |  |             yield frame |  |  |  | 
		
	
		
			
				|  |  |  |  |  |     def frame_nums(self, video_path): | 
		
	
		
			
				|  |  |  |  |  |         with av.open(video_path) as c: | 
		
	
		
			
				|  |  |  |  |  |             video = c.streams.video[0] | 
		
	
		
			
				|  |  |  |  |  |             start = self._start_time if self._start_time is not None else 0 | 
		
	
		
			
				|  |  |  |  |  |             duration = c.duration / 1000000 | 
		
	
		
			
				|  |  |  |  |  |             end = self._end_time if self._end_time and self._end_time <= duration else duration | 
		
	
		
			
				|  |  |  |  |  |             return int(round((end - start) * video.average_rate)) | 
		
	
		
			
				|  |  | 
 |  |  | 
 | 
		
	
		
			
				|  |  |     @staticmethod |  |  |  | 
		
	
		
			
				|  |  |     def get_video_duration(video): |  |  |  | 
		
	
		
			
				|  |  |         if video.duration is not None: |  |  |  | 
		
	
		
			
				|  |  |             return float(video.duration * video.time_base) |  |  |  | 
		
	
		
			
				|  |  |         elif video.metadata.get('DURATION') is not None: |  |  |  | 
		
	
		
			
				|  |  |             time_str = video.metadata['DURATION'] |  |  |  | 
		
	
		
			
				|  |  |             return reduce(lambda x, y: float(x) * 60 + float(y), time_str.split(':')) |  |  |  | 
		
	
		
			
				|  |  |  |  |  |     def __call__(self, video_path: str): | 
		
	
		
			
				|  |  |  |  |  |         if self._sample_type is None: | 
		
	
		
			
				|  |  |  |  |  |             yield from self._filter(self.decode(video_path)) | 
		
	
		
			
				|  |  |  |  |  |         elif self._sample_type == SAMPLE_TYPE.TIME_STEP_SAMPLE: | 
		
	
		
			
				|  |  |  |  |  |             time_step = self._args.get('time_step') | 
		
	
		
			
				|  |  |  |  |  |             if time_step is None: | 
		
	
		
			
				|  |  |  |  |  |                 raise RuntimeError('time_step_sample sample lost args time_step') | 
		
	
		
			
				|  |  |  |  |  |             yield from self._filter(self.time_step_decode(video_path, time_step)) | 
		
	
		
			
				|  |  |  |  |  |         elif self._sample_type == SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE: | 
		
	
		
			
				|  |  |  |  |  |             num_samples = self._args.get('num_samples') | 
		
	
		
			
				|  |  |  |  |  |             if num_samples is None: | 
		
	
		
			
				|  |  |  |  |  |                 raise RuntimeError('uniform_temporal_subsample lost args num_samples') | 
		
	
		
			
				|  |  |  |  |  |             yield from self._uniform_temporal_subsample(self.decode(video_path), num_samples, self.frame_nums(video_path)) | 
		
	
		
			
				|  |  |         else: |  |  |         else: | 
		
	
		
			
				|  |  |             return None |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |     def __call__(self, video_path: str) -> Generator: |  |  |  | 
		
	
		
			
				|  |  |         with av.open(video_path) as container: |  |  |  | 
		
	
		
			
				|  |  |             stream = container.streams.video[0] |  |  |  | 
		
	
		
			
				|  |  |             duration = VideoDecoder.get_video_duration(stream) |  |  |  | 
		
	
		
			
				|  |  |             if duration is None: |  |  |  | 
		
	
		
			
				|  |  |                 duration = float(container.duration) / 1000000 |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |             image_format = 'RGB' |  |  |  | 
		
	
		
			
				|  |  |             frame_gen = VideoDecoder._decdoe(stream, container, self._start_time) |  |  |  | 
		
	
		
			
				|  |  |             sample_function = self.get_sample(stream, duration) |  |  |  | 
		
	
		
			
				|  |  |             for frame in sample_function(frame_gen): |  |  |  | 
		
	
		
			
				|  |  |                 timestamp = int(frame.time * 1000) |  |  |  | 
		
	
		
			
				|  |  |                 ndarray = frame.to_ndarray(format='rgb24') |  |  |  | 
		
	
		
			
				|  |  |                 img = VideoFrame(ndarray, image_format, timestamp, frame.key_frame) |  |  |  | 
		
	
		
			
				|  |  |                 yield img |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | # if __name__ == '__main__': |  |  |  | 
		
	
		
			
				|  |  | #     video_path = "/home/junjie.jiangjjj/workspace/video/[The Rock] [1996] [Trailer] [#2]-16-l-rO5B64.mkv" |  |  |  | 
		
	
		
			
				|  |  | #     video_path1 = "/home/junjie.jiangjjj/workspace/video/'Eagle Eye' Trailer (2008)-_wkqo_Rd3_Q.mp4" |  |  |  | 
		
	
		
			
				|  |  | #     video_path2 = "/home/junjie.jiangjjj/workspace/video/2001 -  A Space Odyssey - Trailer [1968] HD-Z2UWOeBcsJI.webm" |  |  |  | 
		
	
		
			
				|  |  | #     # video_path3 = "/home/zhangchen/zhangchen_workspace/dataset/MSRVTT/msrvtt_data/MSRVTT_Videos/video9991.mp4" |  |  |  | 
		
	
		
			
				|  |  | #     video_path3 = "/home/junjie.jiangjjj/e2adc784b83446ae775f698b9d17c9fd392b2f75.flv" |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | #     def d(video_path): |  |  |  | 
		
	
		
			
				|  |  | #         d = VideoDecoder(10, 17, 'time_step_sample', {'time_step': 1}) |  |  |  | 
		
	
		
			
				|  |  | #         fs = d(video_path) |  |  |  | 
		
	
		
			
				|  |  | #         for f in fs: |  |  |  | 
		
	
		
			
				|  |  | #             print(f.mode, f.key_frame, f.timestamp) |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | #     d(video_path) |  |  |  | 
		
	
		
			
				|  |  | #     # print('#' * 100) |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | #     # with av.open(video_path) as container: |  |  |  | 
		
	
		
			
				|  |  | #     #     print(container.duration) |  |  |  | 
		
	
		
			
				|  |  | #     #     stream = container.streams.video[0] |  |  |  | 
		
	
		
			
				|  |  | #     #     print(stream.time_base) |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  | 
 |  |  |  | 
		
	
		
			
				|  |  |  |  |  |             raise RuntimeError('Unkown sample type, only supports: [%s|%s]' % (SAMPLE_TYPE.TIME_STEP_SAMPLE, SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE)) | 
		
	
	
		
			
				|  | 
 |