logo
Browse Source

add vpf decode

Signed-off-by: junjie.jiang <junjie.jiang@zilliz.com>
main
junjie.jiang 1 year ago
parent
commit
878a2b9b0e
  1. 5
      __init__.py
  2. 50
      cpu_decode.py
  3. 64
      gpu_decode.py
  4. 1
      requirements.txt
  5. 81
      video_decoder.py

5
__init__.py

@ -0,0 +1,5 @@
from .video_decoder import VideoDecoder
def VPF(gpu_id=0, start_time=None, end_time=None, sample_type=None, args=None):
return VideoDecoder(gpu_id, start_time, end_time, sample_type, args)

50
cpu_decode.py

@ -0,0 +1,50 @@
import math
import logging
import av
from towhee.types.video_frame import VideoFrame
logger = logging.getLogger()
class PyAVDecode:
def __init__(self, video_path, start_time=None, time_step=None) -> None:
self._container = av.open(video_path)
self._stream = self._container.streams.video[0]
self._start_time = start_time if start_time is not None else 0
self._time_step = time_step
def close(self):
self._container.close()
def time_step_decode(self):
ts = self._start_time
is_end = False
while not is_end:
is_end = True
offset = int(math.floor(ts / self._stream.time_base))
self._container.seek(offset, stream=self._stream)
for f in self._container.decode(self._stream):
if f.time < ts:
continue
yield self.av_frame_to_video_frame(f)
is_end = False
break
ts += self._time_step
def av_frame_to_video_frame(self, frame):
timestamp = int(round(frame.time * 1000))
ndarray = frame.to_ndarray(format='rgb24')
img = VideoFrame(ndarray, 'RGB', timestamp, frame.key_frame)
return img
def decode(self):
if self._start_time > 0:
offset = int(math.floor(self._start_time / self._stream.time_base))
self._container.seek(offset, any_frame=False, backward=True, stream=self._stream)
for frame in self._container.decode(self._stream):
if frame.time < self._start_time:
continue
yield self.av_frame_to_video_frame(frame)

64
gpu_decode.py

@ -0,0 +1,64 @@
import math
import PyNvCodec as nvc
import numpy as np
from towhee.types import VideoFrame
class VPFDecode:
def __init__(self, video_path: str, gpu_id: int, start_time: int = None, time_step=None):
self._gpu_id = gpu_id
self._nv_dec = nvc.PyNvDecoder(video_path, gpu_id)
self._start_time = start_time
self._time_step = time_step
self._target_w, self._target_h, self._time_base = self._nv_dec.Width(), self._nv_dec.Height(), self._nv_dec.Timebase()
self._avg_rate = self._nv_dec.AvgFramerate()
self._to_rgb = nvc.PySurfaceConverter(
self._target_w, self._target_h, nvc.PixelFormat.NV12, nvc.PixelFormat.RGB, self._gpu_id
)
self._cc = nvc.ColorspaceConversionContext(nvc.ColorSpace.BT_709, nvc.ColorRange.JPEG)
self._pdata = nvc.PacketData()
def timestamp(self, pts) -> int:
return int(round(pts * self._time_base * 1000))
def surface_to_videoframe(self, nv12_surface):
if nv12_surface.Empty():
return None
nv_dwn = nvc.PySurfaceDownloader(self._target_w, self._target_h, nvc.PixelFormat.RGB, self._gpu_id)
rgb24_small = self._to_rgb.Execute(nv12_surface, self._cc)
if rgb24_small.Empty():
raise RuntimeError('Convert to rgb failed')
rawFrameRGB = np.ndarray(shape=(self._target_h, self._target_w, 3), dtype=np.uint8)
if not nv_dwn.DownloadSingleSurface(rgb24_small, rawFrameRGB):
raise RuntimeError('Download image from gpu failed')
return VideoFrame(rawFrameRGB, 'RGB', self.timestamp(self._pdata.pts), self._pdata.key)
def time_step_decode(self):
ts = self._start_time if self._start_time is not None and self._start_time > 0 else 0
while True:
seek_ctx = nvc.SeekContext(
ts, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP
)
nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata)
frame = self.surface_to_videoframe(nv12_surface)
if frame is None:
break
yield frame
ts += self._time_step
def decode(self):
if self._start_time is not None and self._start_time > 0:
seek_ctx = nvc.SeekContext(
self._start_time, nvc.SeekMode.PREV_KEY_FRAME, nvc.SeekCriteria.BY_TIMESTAMP
)
nv12_surface = self._nv_dec.DecodeSingleSurface(seek_ctx, self._pdata)
yield self.surface_to_videoframe(nv12_surface)
while True:
nv12_surface = self._nv_dec.DecodeSingleSurface(self._pdata)
frame = self.surface_to_videoframe(nv12_surface)
if frame is None:
break
yield frame

1
requirements.txt

@ -0,0 +1 @@
av

81
video_decoder.py

@ -0,0 +1,81 @@
import logging
from towhee.operator.base import PyOperator
from cpu_decode import PyAVDecode
logger = logging.getLogger()
try:
from gpu_decode import VPFDecode
except Exception:
logger.error('Import GPUDecoder failed, use CPU decode')
VPFDecode = PyAVDecode
logger = logging.getLogger()
class SAMPLE_TYPE:
UNIFORM_TEMPORAL_SUBSAMPLE = 'uniform_temporal_subsample'
TIME_STEP_SAMPLE = 'time_step_sample'
class VideoDecoder(PyOperator):
'''
VideoDecoder
Return images with RGB format.
'''
def __init__(self, gpu_id=0, start_time=None, end_time=None, sample_type=None, args=None) -> None:
super().__init__()
self._gpu_id = gpu_id
self._start_time = start_time if start_time is not None else 0
self._end_time = end_time * 1000 if end_time is not None else None
self._sample_type = sample_type.lower() if sample_type else None
self._args = args if args is not None else {}
def _gpu_decode(self, video_path):
yield from VPFDecode(video_path, self._gpu_id, self._start_time).decode()
def _cpu_decode(self, video_path):
yield from PyAVDecode(video_path, self._start_time).decode()
def _gpu_time_step_decode(self, video_path, time_step):
yield from VPFDecode(video_path, self._gpu_id, self._start_time, time_step).time_step_decode()
def _cpu_time_step_decode(self, video_path, time_step):
yield from PyAVDecode(video_path, self._start_time, time_step).time_step_decode()
def decode(self, video_path: str):
try:
yield from self._gpu_decode(video_path)
except RuntimeError:
logger.warn('GPU decode failed, only supports [h264,h265,vp9] format, will use CPU')
yield from self._cpu_decode(video_path)
def time_step_decode(self, video_path, time_step):
try:
yield from self._gpu_time_step_decode(video_path, time_step)
except RuntimeError:
logger.warn('GPU decode failed, only supports [h264,h265,vp9] format, will use CPU')
yield from self._cpu_time_step_decode(video_path, time_step)
def _filter(self, frames):
for f in frames:
if self._end_time and f.timestamp > self._end_time:
break
yield f
def __call__(self, video_path: str):
if self._sample_type is None:
yield from self._filter(self.decode(video_path))
elif self._sample_type == SAMPLE_TYPE.TIME_STEP_SAMPLE:
time_step = self._args.get('time_step')
if time_step is None:
raise RuntimeError('time_step_sample sample lost args time_step')
yield from self._filter(self.time_step_decode(video_path, time_step))
elif self._sample_type == SAMPLE_TYPE.TIME_STEP_SAMPLE:
pass
else:
raise RuntimeError('Unkown sample type, only supports: [%s|%s]' % (SAMPLE_TYPE.TIME_STEP_SAMPLE, SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE))
Loading…
Cancel
Save