logo
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Readme
Files and versions

64 lines
2.7 KiB

import math
import PyNvCodec as nvc
import numpy as np
from towhee.types import VideoFrame
class VPFDecode:
def __init__(self, video_path: str, gpu_id: int, start_time: int = None, time_step=None):
self._gpu_id = gpu_id
self._nv_dec = nvc.PyNvDecoder(video_path, gpu_id)
self._start_time = start_time
self._time_step = time_step
self._target_w, self._target_h, self._time_base = self._nv_dec.Width(), self._nv_dec.Height(), self._nv_dec.Timebase()
self._avg_rate = self._nv_dec.AvgFramerate()
self._to_rgb = nvc.PySurfaceConverter(
self._target_w, self._target_h, nvc.PixelFormat.NV12, nvc.PixelFormat.RGB, self._gpu_id
)
self._cc = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG)
self._pdata = nvc.PacketData()
def timestamp(self, pts) -> int:
return int(round(pts * self._time_base * 1000))
def surface_to_videoframe(self, nv12_surface):
if nv12_surface.Empty():
return None
nv_dwn = nvc.PySurfaceDownloader(self._target_w, self._target_h, nvc.PixelFormat.RGB, self._gpu_id)
rgb24_small = self._to_rgb.Execute(nv12_surface, self._cc)
if rgb24_small.Empty():
raise RuntimeError('Convert to rgb failed')
rawFrameRGB = np.ndarray(shape=(self._target_h, self._target_w, 3), dtype=np.uint8)
if not nv_dwn.DownloadSingleSurface(rgb24_small, rawFrameRGB):
raise RuntimeError('Download image from gpu failed')
return VideoFrame(rawFrameRGB, 'RGB', self.timestamp(self._pdata.pts), self._pdata.key)
def time_step_decode(self):
ts = self._start_time if self._start_time is not None and self._start_time > 0 else 0
while True:
seek_ctx = nvc.SeekContext(
ts, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP
)
nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata)
frame = self.surface_to_videoframe(nv12_surface)
if frame is None:
break
yield frame
ts += self._time_step
def decode(self):
if self._start_time is not None and self._start_time > 0:
seek_ctx = nvc.SeekContext(
self._start_time, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP
)
nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata)
yield self.surface_to_videoframe(nv12_surface)
while True:
nv12_surface = self._nv_dec.DecodeSingleSurface(self._pdata)
frame = self.surface_to_videoframe(nv12_surface)
if frame is None:
break
yield frame