logo
Browse Source

Refactor

Signed-off-by: Jael Gu <mengjia.gu@zilliz.com>
main
Jael Gu 2 years ago
parent
commit
52325e25dc
  1. 75
      README.md
  2. 19
      __init__.py
  3. BIN
      checkpoints/clmr_checkpoint_10000.pt
  4. 95
      clmr_magnatagatune.py
  5. 1
      models/__init__.py
  6. 23
      models/model.py
  7. 67
      models/sample_cnn.py
  8. 4
      requirements.txt
  9. 1
      utils/__init__.py
  10. 36
      utils/checkpoint.py

75
README.md

@ -1,2 +1,75 @@
# clmr
# Audio Embedding with CLMR
*Author: Jael Gu*
## Desription
The audio embedding operator converts an input audio into a dense vector which can be used to represent the audio clip's semantics.
This operator is built on top of the original implementation of [CLMR](https://github.com/Spijkervet/CLMR).
The [default model weight](./checkpoints/clmr_checkpoint_10000.pt) provided is pretrained on [Magnatagatune Dataset](https://paperswithcode.com/dataset/magnatagatune) with [SampleCNN](./models/sample_cnn.py).
```python
import numpy as np
from towhee import ops
audio_encoder = ops.audio_embedding.clmr()
# Path or url as input
audio_embedding = audio_encoder("/audio/path/or/url/")
# Audio data as input
audio_data = np.zeros((2, 441344))
sample_rate = 44100
audio_embedding = audio_encoder(audio_data, sample_rate)
```
## Factory Constructor
Create the operator via the following factory method
***ops.audio_embedding.clmr()***
## Interface
An audio embedding operator generates vectors in numpy.ndarray given an audio file path or audio data in numpy.ndarray.
**Parameters:**
​ None.
**Returns**: *numpy.ndarray*
​ Audio embeddings.
## Code Example
Generate embeddings for the audio "test.wav".
*Write the pipeline in simplified style*:
```python
from towhee import dc
dc.glob('test.wav')
.audio_embedding.clmr()
.show()
```
*Write a same pipeline with explicit inputs/outputs name specifications:*
```python
from towhee import dc
dc.glob['path']('test.wav')
.audio_embedding.clmr['path', 'vecs']()
.select('vecs')
.show()
```

19
__init__.py

@ -0,0 +1,19 @@
# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .clmr_magnatagatune import ClmrMagnatagatune
def clmr():
return ClmrMagnatagatune()

BIN
checkpoints/clmr_checkpoint_10000.pt (Stored with Git LFS)

Binary file not shown.

95
clmr_magnatagatune.py

@ -0,0 +1,95 @@
# Copyright 2021 Zilliz. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import logging
from pathlib import Path
from typing import Union
import subprocess
import torchaudio
import torch
import numpy
from towhee.operator import NNOperator
from towhee import register
sys.path.append(str(Path(__file__).parent))
from utils.checkpoint import load_encoder_checkpoint
from models.sample_cnn import SampleCNN
@register(output_schema=['vec'])
class ClmrMagnatagatune(NNOperator):
"""
Pretrained clmr
"""
def __init__(self, framework="pytorch") -> None:
super().__init__(framework=framework)
self.device = "cuda" if torch.cuda.is_available() else "cpu"
weight_path = os.path.join(str(Path(__file__).parent),
'checkpoints/clmr_checkpoint_10000.pt')
state_dict = load_encoder_checkpoint(weight_path, 1)
encoder = SampleCNN(strides=[3, 3, 3, 3, 3, 3, 3, 3, 3], supervised=False, out_dim=1)
encoder.load_state_dict(state_dict)
new_encoder = torch.nn.Sequential(*(list(encoder.children())[:-1]))
x = list(new_encoder[0][:10].children())
y = torch.nn.Sequential(*list(new_encoder[0][10].children())[:-1])
x.append(y)
self.model = torch.nn.Sequential(*x)
self.model.eval()
self.model.to(self.device)
def __call__(self, audio: Union[str, numpy.ndarray], sample_rate: int = None) -> numpy.ndarray:
_sr = 22050
audio_length = 59049
if isinstance(audio, str):
source = os.path.abspath(audio)
audio, sr = torchaudio.load(source)
elif isinstance(audio, numpy.ndarray):
sr = sample_rate
audio = torch.tensor(audio).to(torch.float32)
if sr != _sr:
transform = torchaudio.transforms.Resample(orig_freq=sr, new_freq=_sr)
audio = transform(audio)
with torch.no_grad():
batch = torch.split(audio, audio_length, dim=1)
batch = torch.cat(batch[:-1])
batch = batch.unsqueeze(dim=1)
batch = batch.to(self.device)
features = numpy.squeeze(self.model(batch))
embeddings = features.to("cpu")
return embeddings.detach().numpy()
# if __name__ == "__main__":
# encoder = ClmrMagnatagatune()
#
# audio_path = "/audio/path/or/link"
# vec = encoder(audio_path)
#
# # audio_data = numpy.zeros((2, 441344))
# # sample_rate = 44100
# # vec = encoder(audio_data, sample_rate)
#
# print(vec.shape)

1
models/__init__.py

@ -0,0 +1 @@

23
models/model.py

@ -0,0 +1,23 @@
import torch.nn as nn
import numpy as np
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
def initialize(self, m):
if isinstance(m, (nn.Conv1d)):
# nn.init.xavier_uniform_(m.weight)
# if m.bias is not None:
# nn.init.xavier_uniform_(m.bias)
nn.init.kaiming_uniform_(m.weight, mode="fan_in", nonlinearity="relu")
class Identity(nn.Module):
def __init__(self):
super(Identity, self).__init__()
def forward(self, x):
return x

67
models/sample_cnn.py

@ -0,0 +1,67 @@
import torch
import torch.nn as nn
from .model import Model
class SampleCNN(Model):
def __init__(self, strides, supervised, out_dim):
super(SampleCNN, self).__init__()
self.strides = strides
self.supervised = supervised
self.sequential = [
nn.Sequential(
nn.Conv1d(1, 128, kernel_size=3, stride=3, padding=0),
nn.BatchNorm1d(128),
nn.ReLU(),
)
]
self.hidden = [
[128, 128],
[128, 128],
[128, 256],
[256, 256],
[256, 256],
[256, 256],
[256, 256],
[256, 256],
[256, 512],
]
assert len(self.hidden) == len(
self.strides
), "Number of hidden layers and strides are not equal"
for stride, (h_in, h_out) in zip(self.strides, self.hidden):
self.sequential.append(
nn.Sequential(
nn.Conv1d(h_in, h_out, kernel_size=stride, stride=1, padding=1),
nn.BatchNorm1d(h_out),
nn.ReLU(),
nn.MaxPool1d(stride, stride=stride),
)
)
# 1 x 512
self.sequential.append(
nn.Sequential(
nn.Conv1d(512, 512, kernel_size=3, stride=1, padding=1),
nn.BatchNorm1d(512),
nn.ReLU(),
)
)
self.sequential = nn.Sequential(*self.sequential)
if self.supervised:
self.dropout = nn.Dropout(0.5)
self.fc = nn.Linear(512, out_dim)
def forward(self, x):
out = self.sequential(x)
if self.supervised:
out = self.dropout(out)
out = out.reshape(x.shape[0], out.size(1) * out.size(2))
logit = self.fc(out)
return logit

4
requirements.txt

@ -0,0 +1,4 @@
torchaudio==0.9.0
torch==1.9.0
soundfile
numpy

1
utils/__init__.py

@ -0,0 +1 @@

36
utils/checkpoint.py

@ -0,0 +1,36 @@
import torch
from collections import OrderedDict
def load_encoder_checkpoint(checkpoint_path: str, output_dim: int) -> OrderedDict:
state_dict = torch.load(checkpoint_path, map_location=torch.device("cpu"))
if "pytorch-lightning_version" in state_dict.keys():
new_state_dict = OrderedDict(
{
k.replace("model.encoder.", ""): v
for k, v in state_dict["state_dict"].items()
if "model.encoder." in k
}
)
else:
new_state_dict = OrderedDict()
for k, v in state_dict.items():
if "encoder." in k:
new_state_dict[k.replace("encoder.", "")] = v
new_state_dict["fc.weight"] = torch.zeros(output_dim, 512)
new_state_dict["fc.bias"] = torch.zeros(output_dim)
return new_state_dict
def load_finetuner_checkpoint(checkpoint_path: str) -> OrderedDict:
state_dict = torch.load(checkpoint_path, map_location=torch.device("cpu"))
if "pytorch-lightning_version" in state_dict.keys():
state_dict = OrderedDict(
{
k.replace("model.", ""): v
for k, v in state_dict["state_dict"].items()
if "model." in k
}
)
return state_dict
Loading…
Cancel
Save