diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..8d9886d --- /dev/null +++ b/__init__.py @@ -0,0 +1,5 @@ +from .video_decoder import VideoDecoder + + +def VPF(gpu_id=0, start_time=None, end_time=None, sample_type=None, args=None): + return VideoDecoder(gpu_id, start_time, end_time, sample_type, args) diff --git a/cpu_decode.py b/cpu_decode.py new file mode 100644 index 0000000..094b8e6 --- /dev/null +++ b/cpu_decode.py @@ -0,0 +1,50 @@ +import math +import logging +import av + +from towhee.types.video_frame import VideoFrame + + +logger = logging.getLogger() + + +class PyAVDecode: + def __init__(self, video_path, start_time=None, time_step=None) -> None: + self._container = av.open(video_path) + self._stream = self._container.streams.video[0] + self._start_time = start_time if start_time is not None else 0 + self._time_step = time_step + + def close(self): + self._container.close() + + def time_step_decode(self): + ts = self._start_time + is_end = False + while not is_end: + is_end = True + offset = int(math.floor(ts / self._stream.time_base)) + self._container.seek(offset, stream=self._stream) + for f in self._container.decode(self._stream): + if f.time < ts: + continue + yield self.av_frame_to_video_frame(f) + is_end = False + break + ts += self._time_step + + def av_frame_to_video_frame(self, frame): + timestamp = int(round(frame.time * 1000)) + ndarray = frame.to_ndarray(format='rgb24') + img = VideoFrame(ndarray, 'RGB', timestamp, frame.key_frame) + return img + + def decode(self): + if self._start_time > 0: + offset = int(math.floor(self._start_time / self._stream.time_base)) + self._container.seek(offset, any_frame=False, backward=True, stream=self._stream) + + for frame in self._container.decode(self._stream): + if frame.time < self._start_time: + continue + yield self.av_frame_to_video_frame(frame) diff --git a/gpu_decode.py b/gpu_decode.py new file mode 100644 index 0000000..73d54c8 --- /dev/null +++ b/gpu_decode.py @@ -0,0 +1,64 @@ +import math +import PyNvCodec as nvc +import numpy as np +from towhee.types import VideoFrame + + +class VPFDecode: + def __init__(self, video_path: str, gpu_id: int, start_time: int = None, time_step=None): + self._gpu_id = gpu_id + self._nv_dec = nvc.PyNvDecoder(video_path, gpu_id) + self._start_time = start_time + self._time_step = time_step + self._target_w, self._target_h, self._time_base = self._nv_dec.Width(), self._nv_dec.Height(), self._nv_dec.Timebase() + self._avg_rate = self._nv_dec.AvgFramerate() + self._to_rgb = nvc.PySurfaceConverter( + self._target_w, self._target_h, nvc.PixelFormat.NV12, nvc.PixelFormat.RGB, self._gpu_id + ) + self._cc = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG) + self._pdata = nvc.PacketData() + + def timestamp(self, pts) -> int: + return int(round(pts * self._time_base * 1000)) + + def surface_to_videoframe(self, nv12_surface): + if nv12_surface.Empty(): + return None + + nv_dwn = nvc.PySurfaceDownloader(self._target_w, self._target_h, nvc.PixelFormat.RGB, self._gpu_id) + rgb24_small = self._to_rgb.Execute(nv12_surface, self._cc) + if rgb24_small.Empty(): + raise RuntimeError('Convert to rgb failed') + + rawFrameRGB = np.ndarray(shape=(self._target_h, self._target_w, 3), dtype=np.uint8) + if not nv_dwn.DownloadSingleSurface(rgb24_small, rawFrameRGB): + raise RuntimeError('Download image from gpu failed') + return VideoFrame(rawFrameRGB, 'RGB', self.timestamp(self._pdata.pts), self._pdata.key) + + def time_step_decode(self): + ts = self._start_time if self._start_time is not None and self._start_time > 0 else 0 + while True: + seek_ctx = nvc.SeekContext( + ts, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP + ) + nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata) + frame = self.surface_to_videoframe(nv12_surface) + if frame is None: + break + yield frame + ts += self._time_step + + def decode(self): + if self._start_time is not None and self._start_time > 0: + seek_ctx = nvc.SeekContext( + self._start_time, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP + ) + nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata) + yield self.surface_to_videoframe(nv12_surface) + + while True: + nv12_surface = self._nv_dec.DecodeSingleSurface(self._pdata) + frame = self.surface_to_videoframe(nv12_surface) + if frame is None: + break + yield frame diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc1ce6e --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +av diff --git a/video_decoder.py b/video_decoder.py new file mode 100644 index 0000000..9aa3a55 --- /dev/null +++ b/video_decoder.py @@ -0,0 +1,81 @@ +import logging + +from towhee.operator.base import PyOperator + +from cpu_decode import PyAVDecode + +logger = logging.getLogger() + +try: + from gpu_decode import VPFDecode +except Exception: + logger.error('Import GPUDecoder failed, use CPU decode') + VPFDecode = PyAVDecode + + +logger = logging.getLogger() + + +class SAMPLE_TYPE: + UNIFORM_TEMPORAL_SUBSAMPLE = 'uniform_temporal_subsample' + TIME_STEP_SAMPLE = 'time_step_sample' + + +class VideoDecoder(PyOperator): + ''' + VideoDecoder + Return images with RGB format. + ''' + + def __init__(self, gpu_id=0, start_time=None, end_time=None, sample_type=None, args=None) -> None: + super().__init__() + self._gpu_id = gpu_id + self._start_time = start_time if start_time is not None else 0 + self._end_time = end_time * 1000 if end_time is not None else None + self._sample_type = sample_type.lower() if sample_type else None + self._args = args if args is not None else {} + + def _gpu_decode(self, video_path): + yield from VPFDecode(video_path, self._gpu_id, self._start_time).decode() + + def _cpu_decode(self, video_path): + yield from PyAVDecode(video_path, self._start_time).decode() + + def _gpu_time_step_decode(self, video_path, time_step): + yield from VPFDecode(video_path, self._gpu_id, self._start_time, time_step).time_step_decode() + + def _cpu_time_step_decode(self, video_path, time_step): + yield from PyAVDecode(video_path, self._start_time, time_step).time_step_decode() + + def decode(self, video_path: str): + try: + yield from self._gpu_decode(video_path) + except RuntimeError: + logger.warn('GPU decode failed, only supports [h264,h265,vp9] format, will use CPU') + yield from self._cpu_decode(video_path) + + def time_step_decode(self, video_path, time_step): + try: + yield from self._gpu_time_step_decode(video_path, time_step) + except RuntimeError: + logger.warn('GPU decode failed, only supports [h264,h265,vp9] format, will use CPU') + yield from self._cpu_time_step_decode(video_path, time_step) + + def _filter(self, frames): + for f in frames: + if self._end_time and f.timestamp > self._end_time: + break + yield f + + def __call__(self, video_path: str): + if self._sample_type is None: + yield from self._filter(self.decode(video_path)) + elif self._sample_type == SAMPLE_TYPE.TIME_STEP_SAMPLE: + time_step = self._args.get('time_step') + if time_step is None: + raise RuntimeError('time_step_sample sample lost args time_step') + yield from self._filter(self.time_step_decode(video_path, time_step)) + elif self._sample_type == SAMPLE_TYPE.TIME_STEP_SAMPLE: + pass + else: + raise RuntimeError('Unkown sample type, only supports: [%s|%s]' % (SAMPLE_TYPE.TIME_STEP_SAMPLE, SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE))