|
@ -32,8 +32,8 @@ import logging |
|
|
from pathlib import Path |
|
|
from pathlib import Path |
|
|
from typing import List |
|
|
from typing import List |
|
|
|
|
|
|
|
|
import resampy |
|
|
|
|
|
import torch |
|
|
import torch |
|
|
|
|
|
import torchaudio |
|
|
import numpy |
|
|
import numpy |
|
|
|
|
|
|
|
|
from towhee.operator import NNOperator |
|
|
from towhee.operator import NNOperator |
|
@ -85,22 +85,23 @@ class ClmrMagnatagatune(NNOperator): |
|
|
layout = data[0].layout |
|
|
layout = data[0].layout |
|
|
if layout == 'stereo': |
|
|
if layout == 'stereo': |
|
|
frames = [frame.reshape(-1, 2) for frame in data] |
|
|
frames = [frame.reshape(-1, 2) for frame in data] |
|
|
audio = numpy.vstack(frames).transpose() |
|
|
|
|
|
# audio = numpy.mean(audio, axis=0) |
|
|
|
|
|
# audio = numpy.expand_dims(audio, 0) |
|
|
|
|
|
|
|
|
audio = numpy.vstack(frames) |
|
|
|
|
|
audio = numpy.mean(audio, axis=1) |
|
|
else: |
|
|
else: |
|
|
audio = numpy.hstack(data) |
|
|
audio = numpy.hstack(data) |
|
|
audio = numpy.expand_dims(audio, 0) |
|
|
|
|
|
|
|
|
if len(audio.shape) != 1: |
|
|
|
|
|
audio = audio.squeeze() |
|
|
|
|
|
audio = self.int2float(audio, dtype='float32') |
|
|
|
|
|
audio = torch.from_numpy(audio) |
|
|
|
|
|
|
|
|
audio = self.int2float(audio).astype('float32') |
|
|
|
|
|
if sr != _sr: |
|
|
if sr != _sr: |
|
|
audio = resampy.resample(audio, sr, _sr) |
|
|
|
|
|
|
|
|
resampler = torchaudio.transforms.Resample(sr, _sr, dtype=audio.dtype) |
|
|
|
|
|
audio = resampler(audio) |
|
|
with torch.no_grad(): |
|
|
with torch.no_grad(): |
|
|
audio = torch.from_numpy(audio) |
|
|
|
|
|
batch = torch.split(audio, audio_length, dim=1) |
|
|
|
|
|
batch = torch.cat(batch[:-1]) |
|
|
|
|
|
batch = batch.unsqueeze(dim=1) |
|
|
|
|
|
batch = batch.to(self.device) |
|
|
|
|
|
|
|
|
batch = torch.split(audio, audio_length) |
|
|
|
|
|
batch = [x for x in batch if len(x) == audio_length] |
|
|
|
|
|
batch = torch.vstack(batch) |
|
|
|
|
|
batch = batch.unsqueeze(dim=1).to(self.device) |
|
|
features = numpy.squeeze(self.model(batch)) |
|
|
features = numpy.squeeze(self.model(batch)) |
|
|
|
|
|
|
|
|
return features.to('cpu').detach().numpy() |
|
|
return features.to('cpu').detach().numpy() |
|
@ -116,22 +117,26 @@ class ClmrMagnatagatune(NNOperator): |
|
|
dtype = numpy.dtype(dtype) |
|
|
dtype = numpy.dtype(dtype) |
|
|
assert dtype.kind == 'f' |
|
|
assert dtype.kind == 'f' |
|
|
if wav.dtype.kind in 'iu': |
|
|
if wav.dtype.kind in 'iu': |
|
|
ii = numpy.iinfo(wav.dtype) |
|
|
|
|
|
abs_max = 2 ** (ii.bits - 1) |
|
|
|
|
|
offset = ii.min + abs_max |
|
|
|
|
|
return (wav.astype(dtype) - offset) / abs_max |
|
|
|
|
|
|
|
|
# ii = numpy.iinfo(wav.dtype) |
|
|
|
|
|
# abs_max = 2 ** (ii.bits - 1) |
|
|
|
|
|
# offset = ii.min + abs_max |
|
|
|
|
|
# return (wav.astype(dtype) - offset) / abs_max |
|
|
|
|
|
if wav.dtype != 'int16': |
|
|
|
|
|
wav = (wav >> 16).astype(numpy.int16) |
|
|
|
|
|
assert wav.dtype == 'int16' |
|
|
|
|
|
wav = (wav / 32768.0).astype(dtype) |
|
|
|
|
|
return wav |
|
|
else: |
|
|
else: |
|
|
return wav.astype(dtype) |
|
|
return wav.astype(dtype) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# if __name__ == "__main__": |
|
|
# if __name__ == "__main__": |
|
|
# encoder = ClmrMagnatagatune() |
|
|
|
|
|
|
|
|
# import towhee |
|
|
# |
|
|
# |
|
|
# # audio_path = "/audio/path/or/link" |
|
|
|
|
|
# # vec = encoder(audio_path) |
|
|
|
|
|
|
|
|
# audio_path = "path/to/audio.wav" |
|
|
|
|
|
# frames = towhee.glob(audio_path).audio_decode.ffmpeg(99999).flatten()[0] |
|
|
# |
|
|
# |
|
|
# audio_data = numpy.zeros((2, 441344)) |
|
|
|
|
|
# sample_rate = 44100 |
|
|
|
|
|
# vec = encoder(audio_data, sample_rate) |
|
|
|
|
|
|
|
|
# encoder = ClmrMagnatagatune() |
|
|
|
|
|
# vec = encoder(frames) |
|
|
# |
|
|
# |
|
|
# print(vec.shape) |
|
|
# print(vec.shape) |
|
|