import av from towhee import register from towhee.operator import PyOperator, OperatorFlag from towhee.types.audio_frame import AudioFrame @register(output_schema=['audio_frame'], flag=OperatorFlag.REUSEABLE) class AudioDecoderFFmpeg(PyOperator): """ """ def __init__(self, batch_size=-1, sample_rate=None, layout=None) -> None: super().__init__() self._batch_size = batch_size self._sample_rate = sample_rate self._layout = layout def __call__(self, audio_path: str): frames = [] in_container = av.open(audio_path) stream = in_container.streams.get(audio=0)[0] if self._sample_rate or self._layout: resampler = av.AudioResampler( format=av.AudioFormat(stream.format.name).packed, layout=self._layout if self._layout else 'mono', rate=self._sample_rate if self._sample_rate else 8000 ) else: resampler = None if self._batch_size <= 0: for frame in in_container.decode(stream): if resampler: frame = resampler.resample(frame)[0] timestamp = int(frame.time * 1000) sample_rate = frame.sample_rate layout = frame.layout.name ndarray = frame.to_ndarray() yield AudioFrame(ndarray, sample_rate, timestamp, layout) else: for frame in in_container.decode(stream): if resampler: frame = resampler.resample(frame)[0] timestamp = int(frame.time * 1000) sample_rate = frame.sample_rate layout = frame.layout.name ndarray = frame.to_ndarray() frames.append(AudioFrame(ndarray, sample_rate, timestamp, layout)) if len(frames) == self._batch_size: yield frames frames = [] if len(frames) != 0: yield frames