You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Files and versions

64 lines
2.7 KiB

import math
import PyNvCodec as nvc
import numpy as np
from towhee.types import VideoFrame
class VPFDecode:
def __init__(self, video_path: str, gpu_id: int, start_time: int = None, time_step=None):
self._gpu_id = gpu_id
self._nv_dec = nvc.PyNvDecoder(video_path, gpu_id)
self._start_time = start_time
self._time_step = time_step
self._target_w, self._target_h, self._time_base = self._nv_dec.Width(), self._nv_dec.Height(), self._nv_dec.Timebase()
self._avg_rate = self._nv_dec.AvgFramerate()
self._to_rgb = nvc.PySurfaceConverter(
self._target_w, self._target_h, nvc.PixelFormat.NV12, nvc.PixelFormat.RGB, self._gpu_id
self._cc = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG)
self._pdata = nvc.PacketData()
def timestamp(self, pts) -> int:
return int(round(pts * self._time_base * 1000))
def surface_to_videoframe(self, nv12_surface):
if nv12_surface.Empty():
return None
nv_dwn = nvc.PySurfaceDownloader(self._target_w, self._target_h, nvc.PixelFormat.RGB, self._gpu_id)
rgb24_small = self._to_rgb.Execute(nv12_surface, self._cc)
if rgb24_small.Empty():
raise RuntimeError('Convert to rgb failed')
rawFrameRGB = np.ndarray(shape=(self._target_h, self._target_w, 3), dtype=np.uint8)
if not nv_dwn.DownloadSingleSurface(rgb24_small, rawFrameRGB):
raise RuntimeError('Download image from gpu failed')
return VideoFrame(rawFrameRGB, 'RGB', self.timestamp(self._pdata.pts), self._pdata.key)
def time_step_decode(self):
ts = self._start_time if self._start_time is not None and self._start_time > 0 else 0
while True:
seek_ctx = nvc.SeekContext(
ts, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP
nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata)
frame = self.surface_to_videoframe(nv12_surface)
if frame is None:
yield frame
ts += self._time_step
def decode(self):
if self._start_time is not None and self._start_time > 0:
seek_ctx = nvc.SeekContext(
self._start_time, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP
nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata)
yield self.surface_to_videoframe(nv12_surface)
while True:
nv12_surface = self._nv_dec.DecodeSingleSurface(self._pdata)
frame = self.surface_to_videoframe(nv12_surface)
if frame is None:
yield frame