from typing import Generator, NamedTuple
from functools import partial, reduce
import math
import logging
import av
import numpy as np
from towhee.types.image import Image
from towhee.operator.base import PyOperator
VideoOutput = NamedTuple("Outputs", [("image", Image), ("TIMESTAMP", int)])
logger = logging.getLogger()
UNIFORM_TEMPORAL_SUBSAMPLE = 'uniform_temporal_subsample'
class VideoDecoder(PyOperator):
Return images with RGB format.
def __init__(self, start_time=None, end_time=None, sample_type=None, args=None) -> None:
self._start_time = start_time
self._end_time = end_time
self._sample_type = sample_type
self._args = args if args is not None else {}
def get_sample(self, stream):
if self._sample_type is None:
return self._no_smaple
elif self._sample_type.lower() == SAMPLE_TYPE.UNIFORM_TEMPORAL_SUBSAMPLE:
duration = VideoDecoder.get_video_duration(stream)
end_time = self._end_time if self._end_time is not None and self._end_time <= duration else duration
start_time = self._start_time if self._start_time is not None else 0
nums = int(stream.rate * (end_time - start_time))
return partial(self._uniform_temporal_subsample, total_frames=nums)
raise RuntimeError('Unkown sample type: %s' % self._sample_type)
def _no_smaple(self, frame_iter):
if self._end_time is None:
yield from frame_iter
for frame in frame_iter:
frame.time < self._end_time
yield frame
def _uniform_temporal_subsample(self, frame_iter, total_frames):
num_samples = self._args.get('num_samples')
if num_samples is None:
raise RuntimeError('uniform_temporal_subsample lost args num_samples')
indexs = np.linspace(0, total_frames - 1, num_samples).astype('int')
cur_index = 0
count = 0
for frame in frame_iter:
if cur_index >= len(indexs):
while cur_index < len(indexs) and indexs[cur_index] <= count:
cur_index += 1
yield frame
count += 1
def _decdoe(video, container, start_time):
if start_time is not None:
start_offset = int(math.floor(start_time * (1 / video.time_base)))
start_offset = 0
seek_offset = start_offset
seek_offset = max(seek_offset - 1, 0)
container.seek(seek_offset, any_frame=False, backward=True, stream=video)
except av.AVError as e:
logger.error('Seek to start_time: %s sec failed, the offset is %s, errors: %s' % (start_time, seek_offset, str(e)))
raise RuntimeError from e
for frame in container.decode(video):
if frame.time < start_time:
yield frame
def get_video_duration(video):
if video.duration is not None:
return float(video.duration * video.time_base)
elif video.metadata.get('DURATION') is not None:
time_str = video.metadata['DURATION']
return reduce(lambda x, y: float(x) * 60 + float(y), time_str.split(':'))
return None
def __call__(self, video_path: str) -> Generator:
with av.open(video_path) as container:
stream = container.streams.video[0]
width = stream.width
height = stream.height
channel = 3
image_format = 'RGB'
frame_gen = VideoDecoder._decdoe(stream, container, self._start_time)
sample_function = self.get_sample(stream)
for frame in sample_function(frame_gen):
timestamp = int(frame.time * 1000)
ndarray = frame.to_ndarray(format='rgb24')
img = Image(ndarray.tobytes(), width, height, channel, image_format, None, key_frame=frame.key_frame)
yield VideoOutput(img, timestamp)
