import math import PyNvCodec as nvc import numpy as np from towhee.types import VideoFrame class VPFDecode: def __init__(self, video_path: str, gpu_id: int, start_time: int = None, time_step=None): self._gpu_id = gpu_id self._nv_dec = nvc.PyNvDecoder(video_path, gpu_id) self._start_time = start_time self._time_step = time_step self._target_w, self._target_h, self._time_base = self._nv_dec.Width(), self._nv_dec.Height(), self._nv_dec.Timebase() self._avg_rate = self._nv_dec.AvgFramerate() self._to_rgb = nvc.PySurfaceConverter( self._target_w, self._target_h, nvc.PixelFormat.NV12, nvc.PixelFormat.RGB, self._gpu_id ) self._cc = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG) self._pdata = nvc.PacketData() def timestamp(self, pts) -> int: return int(round(pts * self._time_base * 1000)) def surface_to_videoframe(self, nv12_surface): if nv12_surface.Empty(): return None nv_dwn = nvc.PySurfaceDownloader(self._target_w, self._target_h, nvc.PixelFormat.RGB, self._gpu_id) rgb24_small = self._to_rgb.Execute(nv12_surface, self._cc) if rgb24_small.Empty(): raise RuntimeError('Convert to rgb failed') rawFrameRGB = np.ndarray(shape=(self._target_h, self._target_w, 3), dtype=np.uint8) if not nv_dwn.DownloadSingleSurface(rgb24_small, rawFrameRGB): raise RuntimeError('Download image from gpu failed') return VideoFrame(rawFrameRGB, 'RGB', self.timestamp(self._pdata.pts), self._pdata.key) def time_step_decode(self): ts = self._start_time if self._start_time is not None and self._start_time > 0 else 0 while True: seek_ctx = nvc.SeekContext( ts, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP ) nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata) frame = self.surface_to_videoframe(nv12_surface) if frame is None: break yield frame ts += self._time_step def decode(self): if self._start_time is not None and self._start_time > 0: seek_ctx = nvc.SeekContext( self._start_time, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP ) nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata) yield self.surface_to_videoframe(nv12_surface) while True: nv12_surface = self._nv_dec.DecodeSingleSurface(self._pdata) frame = self.surface_to_videoframe(nv12_surface) if frame is None: break yield frame