mirror of
https://github.com/svc-develop-team/so-vits-svc.git
synced 2025-01-08 11:57:43 +08:00
commit
d8c91d675b
12
README.md
12
README.md
@ -175,6 +175,15 @@ If you are using the `rmvpe` F0 Predictor, you will need to download the pre-tra
|
||||
- download model at [rmvpe.pt](https://huggingface.co/datasets/ylzz1997/rmvpe_pretrain_model/resolve/main/rmvpe.pt)
|
||||
- Place it under the `pretrain` directory
|
||||
|
||||
##### FCPE(Preview version)
|
||||
|
||||
[FCPE(Fast Context-base Pitch Estimator)](https://github.com/CNChTu/MelPE) is a dedicated F0 predictor designed for real-time voice conversion and will become the preferred F0 predictor for sovits real-time voice conversion in the future.(The paper is being written)
|
||||
|
||||
If you are using the `fcpe` F0 Predictor, you will need to download the pre-trained FCPE model.
|
||||
|
||||
- download model at [fcpe.pt](https://huggingface.co/datasets/ylzz1997/rmvpe_pretrain_model/resolve/main/fcpe.pt)
|
||||
- Place it under the `pretrain` directory
|
||||
|
||||
## 📊 Dataset Preparation
|
||||
|
||||
Simply place the dataset in the `dataset_raw` directory with the following file structure:
|
||||
@ -304,6 +313,7 @@ dio
|
||||
pm
|
||||
harvest
|
||||
rmvpe
|
||||
fcpe
|
||||
```
|
||||
|
||||
If the training set is too noisy,it is recommended to use `crepe` to handle f0
|
||||
@ -364,7 +374,7 @@ Required parameters:
|
||||
|
||||
Optional parameters: see the next section
|
||||
- `-lg` | `--linear_gradient`: The cross fade length of two audio slices in seconds. If there is a discontinuous voice after forced slicing, you can adjust this value. Otherwise, it is recommended to use the default value of 0.
|
||||
- `-f0p` | `--f0_predictor`: Select a F0 predictor, options are `crepe`, `pm`, `dio`, `harvest`, `rmvpe`, default value is `pm`(note: f0 mean pooling will be enable when using `crepe`)
|
||||
- `-f0p` | `--f0_predictor`: Select a F0 predictor, options are `crepe`, `pm`, `dio`, `harvest`, `rmvpe`,`fcpe`, default value is `pm`(note: f0 mean pooling will be enable when using `crepe`)
|
||||
- `-a` | `--auto_predict_f0`: automatic pitch prediction, do not enable this when converting singing voices as it can cause serious pitch issues.
|
||||
- `-cm` | `--cluster_model_path`: Cluster model or feature retrieval index path, if left blank, it will be automatically set as the default path of these models. If there is no training cluster or feature retrieval, fill in at will.
|
||||
- `-cr` | `--cluster_infer_ratio`: The proportion of clustering scheme or feature retrieval ranges from 0 to 1. If there is no training clustering model or feature retrieval, the default is 0.
|
||||
|
@ -143,7 +143,7 @@ wget -P pretrain/ https://huggingface.co/lj1995/VoiceConversionWebUI/resolve/mai
|
||||
+ 预训练底模文件: `G_0.pth` `D_0.pth`
|
||||
+ 放在`logs/44k`目录下
|
||||
|
||||
+ 扩散模型预训练底模文件: `model_0.pt `
|
||||
+ 扩散模型预训练底模文件: `model_0.pt`
|
||||
+ 放在`logs/44k/diffusion`目录下
|
||||
|
||||
从 svc-develop-team(待定)或任何其他地方获取 Sovits 底模
|
||||
@ -176,6 +176,17 @@ unzip -od pretrain/nsf_hifigan pretrain/nsf_hifigan_20221211.zip
|
||||
+ 下载模型 [rmvpe.pt](https://huggingface.co/datasets/ylzz1997/rmvpe_pretrain_model/resolve/main/rmvpe.pt)
|
||||
+ 放在`pretrain`目录下
|
||||
|
||||
##### FCPE(预览版)
|
||||
|
||||
> 你说的对,但是[FCPE](https://github.com/CNChTu/MelPE)是由svc-develop-team自主研发的一款全新的F0预测器,后面忘了
|
||||
|
||||
[FCPE(Fast Context-base Pitch Estimator)](https://github.com/CNChTu/MelPE)是一个为实时语音转换所设计的专用F0预测器,他将在未来成为Sovits实时语音转换的首选F0预测器.(论文未来会有的)
|
||||
|
||||
如果使用 `fcpe` F0预测器的话,需要下载预训练的 FCPE 模型
|
||||
|
||||
+ 下载模型 [fcpe.pt](https://huggingface.co/datasets/ylzz1997/rmvpe_pretrain_model/resolve/main/fcpe.pt)
|
||||
+ 放在`pretrain`目录下
|
||||
|
||||
|
||||
## 📊 数据集准备
|
||||
|
||||
@ -307,6 +318,7 @@ dio
|
||||
pm
|
||||
harvest
|
||||
rmvpe
|
||||
fcpe
|
||||
```
|
||||
|
||||
如果训练集过于嘈杂,请使用 crepe 处理 f0
|
||||
@ -365,7 +377,7 @@ python inference_main.py -m "logs/44k/G_30400.pth" -c "configs/config.json" -n "
|
||||
|
||||
可选项部分:部分具体见下一节
|
||||
+ `-lg` | `--linear_gradient`:两段音频切片的交叉淡入长度,如果强制切片后出现人声不连贯可调整该数值,如果连贯建议采用默认值 0,单位为秒
|
||||
+ `-f0p` | `--f0_predictor`:选择 F0 预测器,可选择 crepe,pm,dio,harvest,rmvpe, 默认为 pm(注意:crepe 为原 F0 使用均值滤波器)
|
||||
+ `-f0p` | `--f0_predictor`:选择 F0 预测器,可选择 crepe,pm,dio,harvest,rmvpe,fcpe, 默认为 pm(注意:crepe 为原 F0 使用均值滤波器)
|
||||
+ `-a` | `--auto_predict_f0`:语音转换自动预测音高,转换歌声时不要打开这个会严重跑调
|
||||
+ `-cm` | `--cluster_model_path`:聚类模型或特征检索索引路径,留空则自动设为各方案模型的默认路径,如果没有训练聚类或特征检索则随便填
|
||||
+ `-cr` | `--cluster_infer_ratio`:聚类方案或特征检索占比,范围 0-1,若没有训练聚类模型或特征检索则默认 0 即可
|
||||
|
77
configs_template/config_tiny_template.json
Normal file
77
configs_template/config_tiny_template.json
Normal file
@ -0,0 +1,77 @@
|
||||
{
|
||||
"train": {
|
||||
"log_interval": 200,
|
||||
"eval_interval": 800,
|
||||
"seed": 1234,
|
||||
"epochs": 10000,
|
||||
"learning_rate": 0.0001,
|
||||
"betas": [
|
||||
0.8,
|
||||
0.99
|
||||
],
|
||||
"eps": 1e-09,
|
||||
"batch_size": 6,
|
||||
"fp16_run": false,
|
||||
"half_type": "fp16",
|
||||
"lr_decay": 0.999875,
|
||||
"segment_size": 10240,
|
||||
"init_lr_ratio": 1,
|
||||
"warmup_epochs": 0,
|
||||
"c_mel": 45,
|
||||
"c_kl": 1.0,
|
||||
"use_sr": true,
|
||||
"max_speclen": 512,
|
||||
"port": "8001",
|
||||
"keep_ckpts": 3,
|
||||
"all_in_mem": false,
|
||||
"vol_aug":false
|
||||
},
|
||||
"data": {
|
||||
"training_files": "filelists/train.txt",
|
||||
"validation_files": "filelists/val.txt",
|
||||
"max_wav_value": 32768.0,
|
||||
"sampling_rate": 44100,
|
||||
"filter_length": 2048,
|
||||
"hop_length": 512,
|
||||
"win_length": 2048,
|
||||
"n_mel_channels": 80,
|
||||
"mel_fmin": 0.0,
|
||||
"mel_fmax": 22050,
|
||||
"unit_interpolate_mode":"nearest"
|
||||
},
|
||||
"model": {
|
||||
"inter_channels": 192,
|
||||
"hidden_channels": 192,
|
||||
"filter_channels": 512,
|
||||
"n_heads": 2,
|
||||
"n_layers": 6,
|
||||
"kernel_size": 3,
|
||||
"p_dropout": 0.1,
|
||||
"resblock": "1",
|
||||
"resblock_kernel_sizes": [3,7,11],
|
||||
"resblock_dilation_sizes": [[1,3,5], [1,3,5], [1,3,5]],
|
||||
"upsample_rates": [ 8, 8, 2, 2, 2],
|
||||
"upsample_initial_channel": 400,
|
||||
"upsample_kernel_sizes": [16,16, 4, 4, 4],
|
||||
"n_layers_q": 3,
|
||||
"n_flow_layer": 4,
|
||||
"use_spectral_norm": false,
|
||||
"gin_channels": 768,
|
||||
"ssl_dim": 768,
|
||||
"n_speakers": 200,
|
||||
"vocoder_name":"nsf-hifigan",
|
||||
"speech_encoder":"vec768l12",
|
||||
"speaker_embedding":false,
|
||||
"vol_embedding":false,
|
||||
"use_depthwise_conv":true,
|
||||
"flow_share_parameter": true,
|
||||
"use_automatic_f0_prediction": true
|
||||
},
|
||||
"spk": {
|
||||
"nyaru": 0,
|
||||
"huiyu": 1,
|
||||
"nen": 2,
|
||||
"paimon": 3,
|
||||
"yunhao": 4
|
||||
}
|
||||
}
|
20
export_index_for_onnx.py
Normal file
20
export_index_for_onnx.py
Normal file
@ -0,0 +1,20 @@
|
||||
import os
|
||||
import pickle
|
||||
|
||||
import faiss
|
||||
|
||||
path = "crs"
|
||||
indexs_file_path = f"checkpoints/{path}/feature_and_index.pkl"
|
||||
indexs_out_dir = f"checkpoints/{path}/"
|
||||
|
||||
with open("feature_and_index.pkl",mode="rb") as f:
|
||||
indexs = pickle.load(f)
|
||||
|
||||
for k in indexs:
|
||||
print(f"Save {k} index")
|
||||
faiss.write_index(
|
||||
indexs[k],
|
||||
os.path.join(indexs_out_dir,f"Index-{k}.index")
|
||||
)
|
||||
|
||||
print("Saved all index")
|
@ -203,9 +203,10 @@ class Svc(object):
|
||||
|
||||
def get_unit_f0(self, wav, tran, cluster_infer_ratio, speaker, f0_filter ,f0_predictor,cr_threshold=0.05):
|
||||
|
||||
f0_predictor_object = utils.get_f0_predictor(f0_predictor,hop_length=self.hop_size,sampling_rate=self.target_sample,device=self.dev,threshold=cr_threshold)
|
||||
|
||||
f0, uv = f0_predictor_object.compute_f0_uv(wav)
|
||||
if not hasattr(self,"f0_predictor_object") or self.f0_predictor_object is None or f0_predictor != self.f0_predictor_object.name:
|
||||
self.f0_predictor_object = utils.get_f0_predictor(f0_predictor,hop_length=self.hop_size,sampling_rate=self.target_sample,device=self.dev,threshold=cr_threshold)
|
||||
f0, uv = self.f0_predictor_object.compute_f0_uv(wav)
|
||||
|
||||
if f0_filter and sum(f0) == 0:
|
||||
raise F0FilterException("No voice detected")
|
||||
f0 = torch.FloatTensor(f0).to(self.dev)
|
||||
@ -215,8 +216,11 @@ class Svc(object):
|
||||
f0 = f0.unsqueeze(0)
|
||||
uv = uv.unsqueeze(0)
|
||||
|
||||
wav16k = librosa.resample(wav, orig_sr=self.target_sample, target_sr=16000)
|
||||
wav16k = torch.from_numpy(wav16k).to(self.dev)
|
||||
wav = torch.from_numpy(wav).to(self.dev)
|
||||
if not hasattr(self,"audio16k_resample_transform"):
|
||||
self.audio16k_resample_transform = torchaudio.transforms.Resample(self.target_sample, 16000).to(self.dev)
|
||||
wav16k = self.audio16k_resample_transform(wav[None,:])[0]
|
||||
|
||||
c = self.hubert_model.encoder(wav16k)
|
||||
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1],self.unit_interpolate_mode)
|
||||
|
||||
@ -248,7 +252,7 @@ class Svc(object):
|
||||
|
||||
c = c.unsqueeze(0)
|
||||
return c, f0, uv
|
||||
|
||||
|
||||
def infer(self, speaker, tran, raw_path,
|
||||
cluster_infer_ratio=0,
|
||||
auto_predict_f0=False,
|
||||
@ -263,7 +267,11 @@ class Svc(object):
|
||||
second_encoding = False,
|
||||
loudness_envelope_adjustment = 1
|
||||
):
|
||||
wav, sr = librosa.load(raw_path, sr=self.target_sample)
|
||||
torchaudio.set_audio_backend("soundfile")
|
||||
wav, sr = torchaudio.load(raw_path)
|
||||
if not hasattr(self,"audio_resample_transform") or self.audio16k_resample_transform.orig_freq != sr:
|
||||
self.audio_resample_transform = torchaudio.transforms.Resample(sr,self.target_sample)
|
||||
wav = self.audio_resample_transform(wav).numpy()[0]
|
||||
if spk_mix:
|
||||
c, f0, uv = self.get_unit_f0(wav, tran, 0, None, f0_filter,f0_predictor,cr_threshold=cr_threshold)
|
||||
n_frames = f0.size(1)
|
||||
@ -299,8 +307,9 @@ class Svc(object):
|
||||
if self.only_diffusion or self.shallow_diffusion:
|
||||
vol = self.volume_extractor.extract(audio[None,:])[None,:,None].to(self.dev) if vol is None else vol[:,:,None]
|
||||
if self.shallow_diffusion and second_encoding:
|
||||
audio16k = librosa.resample(audio.detach().cpu().numpy(), orig_sr=self.target_sample, target_sr=16000)
|
||||
audio16k = torch.from_numpy(audio16k).to(self.dev)
|
||||
if not hasattr(self,"audio16k_resample_transform"):
|
||||
self.audio16k_resample_transform = torchaudio.transforms.Resample(self.target_sample, 16000).to(self.dev)
|
||||
audio16k = self.audio16k_resample_transform(audio[None,:])[0]
|
||||
c = self.hubert_model.encoder(audio16k)
|
||||
c = utils.repeat_expand_2d(c.squeeze(0), f0.shape[1],self.unit_interpolate_mode)
|
||||
f0 = f0[:,:,None]
|
||||
|
@ -29,7 +29,7 @@ def main():
|
||||
parser.add_argument('-cm', '--cluster_model_path', type=str, default="", help='聚类模型或特征检索索引路径,留空则自动设为各方案模型的默认路径,如果没有训练聚类或特征检索则随便填')
|
||||
parser.add_argument('-cr', '--cluster_infer_ratio', type=float, default=0, help='聚类方案或特征检索占比,范围0-1,若没有训练聚类模型或特征检索则默认0即可')
|
||||
parser.add_argument('-lg', '--linear_gradient', type=float, default=0, help='两段音频切片的交叉淡入长度,如果强制切片后出现人声不连贯可调整该数值,如果连贯建议采用默认值0,单位为秒')
|
||||
parser.add_argument('-f0p', '--f0_predictor', type=str, default="pm", help='选择F0预测器,可选择crepe,pm,dio,harvest,rmvpe,默认为pm(注意:crepe为原F0使用均值滤波器)')
|
||||
parser.add_argument('-f0p', '--f0_predictor', type=str, default="pm", help='选择F0预测器,可选择crepe,pm,dio,harvest,rmvpe,fcpe默认为pm(注意:crepe为原F0使用均值滤波器)')
|
||||
parser.add_argument('-eh', '--enhance', action='store_true', default=False, help='是否使用NSF_HIFIGAN增强器,该选项对部分训练集少的模型有一定的音质增强效果,但是对训练好的模型有反面效果,默认关闭')
|
||||
parser.add_argument('-shd', '--shallow_diffusion', action='store_true', default=False, help='是否使用浅层扩散,使用后可解决一部分电音问题,默认关闭,该选项打开时,NSF_HIFIGAN增强器将会被禁止')
|
||||
parser.add_argument('-usm', '--use_spk_mix', action='store_true', default=False, help='是否使用角色融合')
|
||||
|
@ -13,6 +13,7 @@ class CrepeF0Predictor(F0Predictor):
|
||||
self.device = device
|
||||
self.threshold = threshold
|
||||
self.sampling_rate = sampling_rate
|
||||
self.name = "crepe"
|
||||
|
||||
def compute_f0(self,wav,p_len=None):
|
||||
x = torch.FloatTensor(wav).to(self.device)
|
||||
|
@ -10,6 +10,7 @@ class DioF0Predictor(F0Predictor):
|
||||
self.f0_min = f0_min
|
||||
self.f0_max = f0_max
|
||||
self.sampling_rate = sampling_rate
|
||||
self.name = "dio"
|
||||
|
||||
def interpolate_f0(self,f0):
|
||||
'''
|
||||
|
109
modules/F0Predictor/FCPEF0Predictor.py
Normal file
109
modules/F0Predictor/FCPEF0Predictor.py
Normal file
@ -0,0 +1,109 @@
|
||||
from typing import Union
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
|
||||
from modules.F0Predictor.F0Predictor import F0Predictor
|
||||
|
||||
from .fcpe.model import FCPEInfer
|
||||
|
||||
|
||||
class FCPEF0Predictor(F0Predictor):
|
||||
def __init__(self, hop_length=512, f0_min=50, f0_max=1100, dtype=torch.float32, device=None, sampling_rate=44100,
|
||||
threshold=0.05):
|
||||
self.fcpe = FCPEInfer(model_path="pretrain/fcpe.pt", device=device, dtype=dtype)
|
||||
self.hop_length = hop_length
|
||||
self.f0_min = f0_min
|
||||
self.f0_max = f0_max
|
||||
if device is None:
|
||||
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
else:
|
||||
self.device = device
|
||||
self.threshold = threshold
|
||||
self.sampling_rate = sampling_rate
|
||||
self.dtype = dtype
|
||||
self.name = "fcpe"
|
||||
|
||||
def repeat_expand(
|
||||
self, content: Union[torch.Tensor, np.ndarray], target_len: int, mode: str = "nearest"
|
||||
):
|
||||
ndim = content.ndim
|
||||
|
||||
if content.ndim == 1:
|
||||
content = content[None, None]
|
||||
elif content.ndim == 2:
|
||||
content = content[None]
|
||||
|
||||
assert content.ndim == 3
|
||||
|
||||
is_np = isinstance(content, np.ndarray)
|
||||
if is_np:
|
||||
content = torch.from_numpy(content)
|
||||
|
||||
results = torch.nn.functional.interpolate(content, size=target_len, mode=mode)
|
||||
|
||||
if is_np:
|
||||
results = results.numpy()
|
||||
|
||||
if ndim == 1:
|
||||
return results[0, 0]
|
||||
elif ndim == 2:
|
||||
return results[0]
|
||||
|
||||
def post_process(self, x, sampling_rate, f0, pad_to):
|
||||
if isinstance(f0, np.ndarray):
|
||||
f0 = torch.from_numpy(f0).float().to(x.device)
|
||||
|
||||
if pad_to is None:
|
||||
return f0
|
||||
|
||||
f0 = self.repeat_expand(f0, pad_to)
|
||||
|
||||
vuv_vector = torch.zeros_like(f0)
|
||||
vuv_vector[f0 > 0.0] = 1.0
|
||||
vuv_vector[f0 <= 0.0] = 0.0
|
||||
|
||||
# 去掉0频率, 并线性插值
|
||||
nzindex = torch.nonzero(f0).squeeze()
|
||||
f0 = torch.index_select(f0, dim=0, index=nzindex).cpu().numpy()
|
||||
time_org = self.hop_length / sampling_rate * nzindex.cpu().numpy()
|
||||
time_frame = np.arange(pad_to) * self.hop_length / sampling_rate
|
||||
|
||||
vuv_vector = F.interpolate(vuv_vector[None, None, :], size=pad_to)[0][0]
|
||||
|
||||
if f0.shape[0] <= 0:
|
||||
return torch.zeros(pad_to, dtype=torch.float, device=x.device).cpu().numpy(), vuv_vector.cpu().numpy()
|
||||
if f0.shape[0] == 1:
|
||||
return (torch.ones(pad_to, dtype=torch.float, device=x.device) * f0[
|
||||
0]).cpu().numpy(), vuv_vector.cpu().numpy()
|
||||
|
||||
# 大概可以用 torch 重写?
|
||||
f0 = np.interp(time_frame, time_org, f0, left=f0[0], right=f0[-1])
|
||||
# vuv_vector = np.ceil(scipy.ndimage.zoom(vuv_vector,pad_to/len(vuv_vector),order = 0))
|
||||
|
||||
return f0, vuv_vector.cpu().numpy()
|
||||
|
||||
def compute_f0(self, wav, p_len=None):
|
||||
x = torch.FloatTensor(wav).to(self.dtype).to(self.device)
|
||||
if p_len is None:
|
||||
p_len = x.shape[0] // self.hop_length
|
||||
else:
|
||||
assert abs(p_len - x.shape[0] // self.hop_length) < 4, "pad length error"
|
||||
f0 = self.fcpe(x, sr=self.sampling_rate, threshold=self.threshold)[0,:,0]
|
||||
if torch.all(f0 == 0):
|
||||
rtn = f0.cpu().numpy() if p_len is None else np.zeros(p_len)
|
||||
return rtn, rtn
|
||||
return self.post_process(x, self.sampling_rate, f0, p_len)[0]
|
||||
|
||||
def compute_f0_uv(self, wav, p_len=None):
|
||||
x = torch.FloatTensor(wav).to(self.dtype).to(self.device)
|
||||
if p_len is None:
|
||||
p_len = x.shape[0] // self.hop_length
|
||||
else:
|
||||
assert abs(p_len - x.shape[0] // self.hop_length) < 4, "pad length error"
|
||||
f0 = self.fcpe(x, sr=self.sampling_rate, threshold=self.threshold)[0,:,0]
|
||||
if torch.all(f0 == 0):
|
||||
rtn = f0.cpu().numpy() if p_len is None else np.zeros(p_len)
|
||||
return rtn, rtn
|
||||
return self.post_process(x, self.sampling_rate, f0, p_len)
|
@ -10,6 +10,7 @@ class HarvestF0Predictor(F0Predictor):
|
||||
self.f0_min = f0_min
|
||||
self.f0_max = f0_max
|
||||
self.sampling_rate = sampling_rate
|
||||
self.name = "harvest"
|
||||
|
||||
def interpolate_f0(self,f0):
|
||||
'''
|
||||
|
@ -10,7 +10,7 @@ class PMF0Predictor(F0Predictor):
|
||||
self.f0_min = f0_min
|
||||
self.f0_max = f0_max
|
||||
self.sampling_rate = sampling_rate
|
||||
|
||||
self.name = "pm"
|
||||
|
||||
def interpolate_f0(self,f0):
|
||||
'''
|
||||
|
@ -22,6 +22,7 @@ class RMVPEF0Predictor(F0Predictor):
|
||||
self.threshold = threshold
|
||||
self.sampling_rate = sampling_rate
|
||||
self.dtype = dtype
|
||||
self.name = "rmvpe"
|
||||
|
||||
def repeat_expand(
|
||||
self, content: Union[torch.Tensor, np.ndarray], target_len: int, mode: str = "nearest"
|
||||
|
3
modules/F0Predictor/fcpe/__init__.py
Normal file
3
modules/F0Predictor/fcpe/__init__.py
Normal file
@ -0,0 +1,3 @@
|
||||
from .model import FCPEInfer # noqa: F401
|
||||
from .nvSTFT import STFT # noqa: F401
|
||||
from .pcmer import PCmer # noqa: F401
|
262
modules/F0Predictor/fcpe/model.py
Normal file
262
modules/F0Predictor/fcpe/model.py
Normal file
@ -0,0 +1,262 @@
|
||||
import numpy as np
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.nn.functional as F
|
||||
from torch.nn.utils import weight_norm
|
||||
from torchaudio.transforms import Resample
|
||||
|
||||
from .nvSTFT import STFT
|
||||
from .pcmer import PCmer
|
||||
|
||||
|
||||
def l2_regularization(model, l2_alpha):
|
||||
l2_loss = []
|
||||
for module in model.modules():
|
||||
if type(module) is nn.Conv2d:
|
||||
l2_loss.append((module.weight ** 2).sum() / 2.0)
|
||||
return l2_alpha * sum(l2_loss)
|
||||
|
||||
|
||||
class FCPE(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
input_channel=128,
|
||||
out_dims=360,
|
||||
n_layers=12,
|
||||
n_chans=512,
|
||||
use_siren=False,
|
||||
use_full=False,
|
||||
loss_mse_scale=10,
|
||||
loss_l2_regularization=False,
|
||||
loss_l2_regularization_scale=1,
|
||||
loss_grad1_mse=False,
|
||||
loss_grad1_mse_scale=1,
|
||||
f0_max=1975.5,
|
||||
f0_min=32.70,
|
||||
confidence=False,
|
||||
threshold=0.05,
|
||||
use_input_conv=True
|
||||
):
|
||||
super().__init__()
|
||||
if use_siren is True:
|
||||
raise ValueError("Siren is not supported yet.")
|
||||
if use_full is True:
|
||||
raise ValueError("Full model is not supported yet.")
|
||||
|
||||
self.loss_mse_scale = loss_mse_scale if (loss_mse_scale is not None) else 10
|
||||
self.loss_l2_regularization = loss_l2_regularization if (loss_l2_regularization is not None) else False
|
||||
self.loss_l2_regularization_scale = loss_l2_regularization_scale if (loss_l2_regularization_scale
|
||||
is not None) else 1
|
||||
self.loss_grad1_mse = loss_grad1_mse if (loss_grad1_mse is not None) else False
|
||||
self.loss_grad1_mse_scale = loss_grad1_mse_scale if (loss_grad1_mse_scale is not None) else 1
|
||||
self.f0_max = f0_max if (f0_max is not None) else 1975.5
|
||||
self.f0_min = f0_min if (f0_min is not None) else 32.70
|
||||
self.confidence = confidence if (confidence is not None) else False
|
||||
self.threshold = threshold if (threshold is not None) else 0.05
|
||||
self.use_input_conv = use_input_conv if (use_input_conv is not None) else True
|
||||
|
||||
self.cent_table_b = torch.Tensor(
|
||||
np.linspace(self.f0_to_cent(torch.Tensor([f0_min]))[0], self.f0_to_cent(torch.Tensor([f0_max]))[0],
|
||||
out_dims))
|
||||
self.register_buffer("cent_table", self.cent_table_b)
|
||||
|
||||
# conv in stack
|
||||
_leaky = nn.LeakyReLU()
|
||||
self.stack = nn.Sequential(
|
||||
nn.Conv1d(input_channel, n_chans, 3, 1, 1),
|
||||
nn.GroupNorm(4, n_chans),
|
||||
_leaky,
|
||||
nn.Conv1d(n_chans, n_chans, 3, 1, 1))
|
||||
|
||||
# transformer
|
||||
self.decoder = PCmer(
|
||||
num_layers=n_layers,
|
||||
num_heads=8,
|
||||
dim_model=n_chans,
|
||||
dim_keys=n_chans,
|
||||
dim_values=n_chans,
|
||||
residual_dropout=0.1,
|
||||
attention_dropout=0.1)
|
||||
self.norm = nn.LayerNorm(n_chans)
|
||||
|
||||
# out
|
||||
self.n_out = out_dims
|
||||
self.dense_out = weight_norm(
|
||||
nn.Linear(n_chans, self.n_out))
|
||||
|
||||
def forward(self, mel, infer=True, gt_f0=None, return_hz_f0=False, cdecoder = "local_argmax"):
|
||||
"""
|
||||
input:
|
||||
B x n_frames x n_unit
|
||||
return:
|
||||
dict of B x n_frames x feat
|
||||
"""
|
||||
if cdecoder == "argmax":
|
||||
self.cdecoder = self.cents_decoder
|
||||
elif cdecoder == "local_argmax":
|
||||
self.cdecoder = self.cents_local_decoder
|
||||
if self.use_input_conv:
|
||||
x = self.stack(mel.transpose(1, 2)).transpose(1, 2)
|
||||
else:
|
||||
x = mel
|
||||
x = self.decoder(x)
|
||||
x = self.norm(x)
|
||||
x = self.dense_out(x) # [B,N,D]
|
||||
x = torch.sigmoid(x)
|
||||
if not infer:
|
||||
gt_cent_f0 = self.f0_to_cent(gt_f0) # mel f0 #[B,N,1]
|
||||
gt_cent_f0 = self.gaussian_blurred_cent(gt_cent_f0) # #[B,N,out_dim]
|
||||
loss_all = self.loss_mse_scale * F.binary_cross_entropy(x, gt_cent_f0) # bce loss
|
||||
# l2 regularization
|
||||
if self.loss_l2_regularization:
|
||||
loss_all = loss_all + l2_regularization(model=self, l2_alpha=self.loss_l2_regularization_scale)
|
||||
x = loss_all
|
||||
if infer:
|
||||
x = self.cdecoder(x)
|
||||
x = self.cent_to_f0(x)
|
||||
if not return_hz_f0:
|
||||
x = (1 + x / 700).log()
|
||||
return x
|
||||
|
||||
def cents_decoder(self, y, mask=True):
|
||||
B, N, _ = y.size()
|
||||
ci = self.cent_table[None, None, :].expand(B, N, -1)
|
||||
rtn = torch.sum(ci * y, dim=-1, keepdim=True) / torch.sum(y, dim=-1, keepdim=True) # cents: [B,N,1]
|
||||
if mask:
|
||||
confident = torch.max(y, dim=-1, keepdim=True)[0]
|
||||
confident_mask = torch.ones_like(confident)
|
||||
confident_mask[confident <= self.threshold] = float("-INF")
|
||||
rtn = rtn * confident_mask
|
||||
if self.confidence:
|
||||
return rtn, confident
|
||||
else:
|
||||
return rtn
|
||||
|
||||
def cents_local_decoder(self, y, mask=True):
|
||||
B, N, _ = y.size()
|
||||
ci = self.cent_table[None, None, :].expand(B, N, -1)
|
||||
confident, max_index = torch.max(y, dim=-1, keepdim=True)
|
||||
local_argmax_index = torch.arange(0,8).to(max_index.device) + (max_index - 4)
|
||||
local_argmax_index[local_argmax_index<0] = 0
|
||||
local_argmax_index[local_argmax_index>=self.n_out] = self.n_out - 1
|
||||
ci_l = torch.gather(ci,-1,local_argmax_index)
|
||||
y_l = torch.gather(y,-1,local_argmax_index)
|
||||
rtn = torch.sum(ci_l * y_l, dim=-1, keepdim=True) / torch.sum(y_l, dim=-1, keepdim=True) # cents: [B,N,1]
|
||||
if mask:
|
||||
confident_mask = torch.ones_like(confident)
|
||||
confident_mask[confident <= self.threshold] = float("-INF")
|
||||
rtn = rtn * confident_mask
|
||||
if self.confidence:
|
||||
return rtn, confident
|
||||
else:
|
||||
return rtn
|
||||
|
||||
def cent_to_f0(self, cent):
|
||||
return 10. * 2 ** (cent / 1200.)
|
||||
|
||||
def f0_to_cent(self, f0):
|
||||
return 1200. * torch.log2(f0 / 10.)
|
||||
|
||||
def gaussian_blurred_cent(self, cents): # cents: [B,N,1]
|
||||
mask = (cents > 0.1) & (cents < (1200. * np.log2(self.f0_max / 10.)))
|
||||
B, N, _ = cents.size()
|
||||
ci = self.cent_table[None, None, :].expand(B, N, -1)
|
||||
return torch.exp(-torch.square(ci - cents) / 1250) * mask.float()
|
||||
|
||||
|
||||
class FCPEInfer:
|
||||
def __init__(self, model_path, device=None, dtype=torch.float32):
|
||||
if device is None:
|
||||
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
self.device = device
|
||||
ckpt = torch.load(model_path, map_location=torch.device(self.device))
|
||||
self.args = DotDict(ckpt["config"])
|
||||
self.dtype = dtype
|
||||
model = FCPE(
|
||||
input_channel=self.args.model.input_channel,
|
||||
out_dims=self.args.model.out_dims,
|
||||
n_layers=self.args.model.n_layers,
|
||||
n_chans=self.args.model.n_chans,
|
||||
use_siren=self.args.model.use_siren,
|
||||
use_full=self.args.model.use_full,
|
||||
loss_mse_scale=self.args.loss.loss_mse_scale,
|
||||
loss_l2_regularization=self.args.loss.loss_l2_regularization,
|
||||
loss_l2_regularization_scale=self.args.loss.loss_l2_regularization_scale,
|
||||
loss_grad1_mse=self.args.loss.loss_grad1_mse,
|
||||
loss_grad1_mse_scale=self.args.loss.loss_grad1_mse_scale,
|
||||
f0_max=self.args.model.f0_max,
|
||||
f0_min=self.args.model.f0_min,
|
||||
confidence=self.args.model.confidence,
|
||||
)
|
||||
model.to(self.device).to(self.dtype)
|
||||
model.load_state_dict(ckpt['model'])
|
||||
model.eval()
|
||||
self.model = model
|
||||
self.wav2mel = Wav2Mel(self.args, dtype=self.dtype, device=self.device)
|
||||
|
||||
@torch.no_grad()
|
||||
def __call__(self, audio, sr, threshold=0.05):
|
||||
self.model.threshold = threshold
|
||||
audio = audio[None,:]
|
||||
mel = self.wav2mel(audio=audio, sample_rate=sr).to(self.dtype)
|
||||
f0 = self.model(mel=mel, infer=True, return_hz_f0=True)
|
||||
return f0
|
||||
|
||||
|
||||
class Wav2Mel:
|
||||
|
||||
def __init__(self, args, device=None, dtype=torch.float32):
|
||||
# self.args = args
|
||||
self.sampling_rate = args.mel.sampling_rate
|
||||
self.hop_size = args.mel.hop_size
|
||||
if device is None:
|
||||
device = 'cuda' if torch.cuda.is_available() else 'cpu'
|
||||
self.device = device
|
||||
self.dtype = dtype
|
||||
self.stft = STFT(
|
||||
args.mel.sampling_rate,
|
||||
args.mel.num_mels,
|
||||
args.mel.n_fft,
|
||||
args.mel.win_size,
|
||||
args.mel.hop_size,
|
||||
args.mel.fmin,
|
||||
args.mel.fmax
|
||||
)
|
||||
self.resample_kernel = {}
|
||||
|
||||
def extract_nvstft(self, audio, keyshift=0, train=False):
|
||||
mel = self.stft.get_mel(audio, keyshift=keyshift, train=train).transpose(1, 2) # B, n_frames, bins
|
||||
return mel
|
||||
|
||||
def extract_mel(self, audio, sample_rate, keyshift=0, train=False):
|
||||
audio = audio.to(self.dtype).to(self.device)
|
||||
# resample
|
||||
if sample_rate == self.sampling_rate:
|
||||
audio_res = audio
|
||||
else:
|
||||
key_str = str(sample_rate)
|
||||
if key_str not in self.resample_kernel:
|
||||
self.resample_kernel[key_str] = Resample(sample_rate, self.sampling_rate, lowpass_filter_width=128)
|
||||
self.resample_kernel[key_str] = self.resample_kernel[key_str].to(self.dtype).to(self.device)
|
||||
audio_res = self.resample_kernel[key_str](audio)
|
||||
|
||||
# extract
|
||||
mel = self.extract_nvstft(audio_res, keyshift=keyshift, train=train) # B, n_frames, bins
|
||||
n_frames = int(audio.shape[1] // self.hop_size) + 1
|
||||
if n_frames > int(mel.shape[1]):
|
||||
mel = torch.cat((mel, mel[:, -1:, :]), 1)
|
||||
if n_frames < int(mel.shape[1]):
|
||||
mel = mel[:, :n_frames, :]
|
||||
return mel
|
||||
|
||||
def __call__(self, audio, sample_rate, keyshift=0, train=False):
|
||||
return self.extract_mel(audio, sample_rate, keyshift=keyshift, train=train)
|
||||
|
||||
|
||||
class DotDict(dict):
|
||||
def __getattr__(*args):
|
||||
val = dict.get(*args)
|
||||
return DotDict(val) if type(val) is dict else val
|
||||
|
||||
__setattr__ = dict.__setitem__
|
||||
__delattr__ = dict.__delitem__
|
133
modules/F0Predictor/fcpe/nvSTFT.py
Normal file
133
modules/F0Predictor/fcpe/nvSTFT.py
Normal file
@ -0,0 +1,133 @@
|
||||
import os
|
||||
|
||||
import librosa
|
||||
import numpy as np
|
||||
import soundfile as sf
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
import torch.utils.data
|
||||
from librosa.filters import mel as librosa_mel_fn
|
||||
|
||||
os.environ["LRU_CACHE_CAPACITY"] = "3"
|
||||
|
||||
def load_wav_to_torch(full_path, target_sr=None, return_empty_on_exception=False):
|
||||
sampling_rate = None
|
||||
try:
|
||||
data, sampling_rate = sf.read(full_path, always_2d=True)# than soundfile.
|
||||
except Exception as ex:
|
||||
print(f"'{full_path}' failed to load.\nException:")
|
||||
print(ex)
|
||||
if return_empty_on_exception:
|
||||
return [], sampling_rate or target_sr or 48000
|
||||
else:
|
||||
raise Exception(ex)
|
||||
|
||||
if len(data.shape) > 1:
|
||||
data = data[:, 0]
|
||||
assert len(data) > 2# check duration of audio file is > 2 samples (because otherwise the slice operation was on the wrong dimension)
|
||||
|
||||
if np.issubdtype(data.dtype, np.integer): # if audio data is type int
|
||||
max_mag = -np.iinfo(data.dtype).min # maximum magnitude = min possible value of intXX
|
||||
else: # if audio data is type fp32
|
||||
max_mag = max(np.amax(data), -np.amin(data))
|
||||
max_mag = (2**31)+1 if max_mag > (2**15) else ((2**15)+1 if max_mag > 1.01 else 1.0) # data should be either 16-bit INT, 32-bit INT or [-1 to 1] float32
|
||||
|
||||
data = torch.FloatTensor(data.astype(np.float32))/max_mag
|
||||
|
||||
if (torch.isinf(data) | torch.isnan(data)).any() and return_empty_on_exception:# resample will crash with inf/NaN inputs. return_empty_on_exception will return empty arr instead of except
|
||||
return [], sampling_rate or target_sr or 48000
|
||||
if target_sr is not None and sampling_rate != target_sr:
|
||||
data = torch.from_numpy(librosa.core.resample(data.numpy(), orig_sr=sampling_rate, target_sr=target_sr))
|
||||
sampling_rate = target_sr
|
||||
|
||||
return data, sampling_rate
|
||||
|
||||
def dynamic_range_compression(x, C=1, clip_val=1e-5):
|
||||
return np.log(np.clip(x, a_min=clip_val, a_max=None) * C)
|
||||
|
||||
def dynamic_range_decompression(x, C=1):
|
||||
return np.exp(x) / C
|
||||
|
||||
def dynamic_range_compression_torch(x, C=1, clip_val=1e-5):
|
||||
return torch.log(torch.clamp(x, min=clip_val) * C)
|
||||
|
||||
def dynamic_range_decompression_torch(x, C=1):
|
||||
return torch.exp(x) / C
|
||||
|
||||
class STFT():
|
||||
def __init__(self, sr=22050, n_mels=80, n_fft=1024, win_size=1024, hop_length=256, fmin=20, fmax=11025, clip_val=1e-5):
|
||||
self.target_sr = sr
|
||||
|
||||
self.n_mels = n_mels
|
||||
self.n_fft = n_fft
|
||||
self.win_size = win_size
|
||||
self.hop_length = hop_length
|
||||
self.fmin = fmin
|
||||
self.fmax = fmax
|
||||
self.clip_val = clip_val
|
||||
self.mel_basis = {}
|
||||
self.hann_window = {}
|
||||
|
||||
def get_mel(self, y, keyshift=0, speed=1, center=False, train=False):
|
||||
sampling_rate = self.target_sr
|
||||
n_mels = self.n_mels
|
||||
n_fft = self.n_fft
|
||||
win_size = self.win_size
|
||||
hop_length = self.hop_length
|
||||
fmin = self.fmin
|
||||
fmax = self.fmax
|
||||
clip_val = self.clip_val
|
||||
|
||||
factor = 2 ** (keyshift / 12)
|
||||
n_fft_new = int(np.round(n_fft * factor))
|
||||
win_size_new = int(np.round(win_size * factor))
|
||||
hop_length_new = int(np.round(hop_length * speed))
|
||||
if not train:
|
||||
mel_basis = self.mel_basis
|
||||
hann_window = self.hann_window
|
||||
else:
|
||||
mel_basis = {}
|
||||
hann_window = {}
|
||||
|
||||
if torch.min(y) < -1.:
|
||||
print('min value is ', torch.min(y))
|
||||
if torch.max(y) > 1.:
|
||||
print('max value is ', torch.max(y))
|
||||
|
||||
mel_basis_key = str(fmax)+'_'+str(y.device)
|
||||
if mel_basis_key not in mel_basis:
|
||||
mel = librosa_mel_fn(sr=sampling_rate, n_fft=n_fft, n_mels=n_mels, fmin=fmin, fmax=fmax)
|
||||
mel_basis[mel_basis_key] = torch.from_numpy(mel).float().to(y.device)
|
||||
|
||||
keyshift_key = str(keyshift)+'_'+str(y.device)
|
||||
if keyshift_key not in hann_window:
|
||||
hann_window[keyshift_key] = torch.hann_window(win_size_new).to(y.device)
|
||||
|
||||
pad_left = (win_size_new - hop_length_new) //2
|
||||
pad_right = max((win_size_new- hop_length_new + 1) //2, win_size_new - y.size(-1) - pad_left)
|
||||
if pad_right < y.size(-1):
|
||||
mode = 'reflect'
|
||||
else:
|
||||
mode = 'constant'
|
||||
y = torch.nn.functional.pad(y.unsqueeze(1), (pad_left, pad_right), mode = mode)
|
||||
y = y.squeeze(1)
|
||||
|
||||
spec = torch.stft(y, n_fft_new, hop_length=hop_length_new, win_length=win_size_new, window=hann_window[keyshift_key],
|
||||
center=center, pad_mode='reflect', normalized=False, onesided=True, return_complex=True)
|
||||
spec = torch.sqrt(spec.real.pow(2) + spec.imag.pow(2) + (1e-9))
|
||||
if keyshift != 0:
|
||||
size = n_fft // 2 + 1
|
||||
resize = spec.size(1)
|
||||
if resize < size:
|
||||
spec = F.pad(spec, (0, 0, 0, size-resize))
|
||||
spec = spec[:, :size, :] * win_size / win_size_new
|
||||
spec = torch.matmul(mel_basis[mel_basis_key], spec)
|
||||
spec = dynamic_range_compression_torch(spec, clip_val=clip_val)
|
||||
return spec
|
||||
|
||||
def __call__(self, audiopath):
|
||||
audio, sr = load_wav_to_torch(audiopath, target_sr=self.target_sr)
|
||||
spect = self.get_mel(audio.unsqueeze(0)).squeeze(0)
|
||||
return spect
|
||||
|
||||
stft = STFT()
|
369
modules/F0Predictor/fcpe/pcmer.py
Normal file
369
modules/F0Predictor/fcpe/pcmer.py
Normal file
@ -0,0 +1,369 @@
|
||||
import math
|
||||
from functools import partial
|
||||
|
||||
import torch
|
||||
import torch.nn.functional as F
|
||||
from einops import rearrange, repeat
|
||||
from local_attention import LocalAttention
|
||||
from torch import nn
|
||||
|
||||
#import fast_transformers.causal_product.causal_product_cuda
|
||||
|
||||
def softmax_kernel(data, *, projection_matrix, is_query, normalize_data=True, eps=1e-4, device = None):
|
||||
b, h, *_ = data.shape
|
||||
# (batch size, head, length, model_dim)
|
||||
|
||||
# normalize model dim
|
||||
data_normalizer = (data.shape[-1] ** -0.25) if normalize_data else 1.
|
||||
|
||||
# what is ration?, projection_matrix.shape[0] --> 266
|
||||
|
||||
ratio = (projection_matrix.shape[0] ** -0.5)
|
||||
|
||||
projection = repeat(projection_matrix, 'j d -> b h j d', b = b, h = h)
|
||||
projection = projection.type_as(data)
|
||||
|
||||
#data_dash = w^T x
|
||||
data_dash = torch.einsum('...id,...jd->...ij', (data_normalizer * data), projection)
|
||||
|
||||
|
||||
# diag_data = D**2
|
||||
diag_data = data ** 2
|
||||
diag_data = torch.sum(diag_data, dim=-1)
|
||||
diag_data = (diag_data / 2.0) * (data_normalizer ** 2)
|
||||
diag_data = diag_data.unsqueeze(dim=-1)
|
||||
|
||||
#print ()
|
||||
if is_query:
|
||||
data_dash = ratio * (
|
||||
torch.exp(data_dash - diag_data -
|
||||
torch.max(data_dash, dim=-1, keepdim=True).values) + eps)
|
||||
else:
|
||||
data_dash = ratio * (
|
||||
torch.exp(data_dash - diag_data + eps))#- torch.max(data_dash)) + eps)
|
||||
|
||||
return data_dash.type_as(data)
|
||||
|
||||
def orthogonal_matrix_chunk(cols, qr_uniform_q = False, device = None):
|
||||
unstructured_block = torch.randn((cols, cols), device = device)
|
||||
q, r = torch.linalg.qr(unstructured_block.cpu(), mode='reduced')
|
||||
q, r = map(lambda t: t.to(device), (q, r))
|
||||
|
||||
# proposed by @Parskatt
|
||||
# to make sure Q is uniform https://arxiv.org/pdf/math-ph/0609050.pdf
|
||||
if qr_uniform_q:
|
||||
d = torch.diag(r, 0)
|
||||
q *= d.sign()
|
||||
return q.t()
|
||||
def exists(val):
|
||||
return val is not None
|
||||
|
||||
def empty(tensor):
|
||||
return tensor.numel() == 0
|
||||
|
||||
def default(val, d):
|
||||
return val if exists(val) else d
|
||||
|
||||
def cast_tuple(val):
|
||||
return (val,) if not isinstance(val, tuple) else val
|
||||
|
||||
class PCmer(nn.Module):
|
||||
"""The encoder that is used in the Transformer model."""
|
||||
|
||||
def __init__(self,
|
||||
num_layers,
|
||||
num_heads,
|
||||
dim_model,
|
||||
dim_keys,
|
||||
dim_values,
|
||||
residual_dropout,
|
||||
attention_dropout):
|
||||
super().__init__()
|
||||
self.num_layers = num_layers
|
||||
self.num_heads = num_heads
|
||||
self.dim_model = dim_model
|
||||
self.dim_values = dim_values
|
||||
self.dim_keys = dim_keys
|
||||
self.residual_dropout = residual_dropout
|
||||
self.attention_dropout = attention_dropout
|
||||
|
||||
self._layers = nn.ModuleList([_EncoderLayer(self) for _ in range(num_layers)])
|
||||
|
||||
# METHODS ########################################################################################################
|
||||
|
||||
def forward(self, phone, mask=None):
|
||||
|
||||
# apply all layers to the input
|
||||
for (i, layer) in enumerate(self._layers):
|
||||
phone = layer(phone, mask)
|
||||
# provide the final sequence
|
||||
return phone
|
||||
|
||||
|
||||
# ==================================================================================================================== #
|
||||
# CLASS _ E N C O D E R L A Y E R #
|
||||
# ==================================================================================================================== #
|
||||
|
||||
|
||||
class _EncoderLayer(nn.Module):
|
||||
"""One layer of the encoder.
|
||||
|
||||
Attributes:
|
||||
attn: (:class:`mha.MultiHeadAttention`): The attention mechanism that is used to read the input sequence.
|
||||
feed_forward (:class:`ffl.FeedForwardLayer`): The feed-forward layer on top of the attention mechanism.
|
||||
"""
|
||||
|
||||
def __init__(self, parent: PCmer):
|
||||
"""Creates a new instance of ``_EncoderLayer``.
|
||||
|
||||
Args:
|
||||
parent (Encoder): The encoder that the layers is created for.
|
||||
"""
|
||||
super().__init__()
|
||||
|
||||
|
||||
self.conformer = ConformerConvModule(parent.dim_model)
|
||||
self.norm = nn.LayerNorm(parent.dim_model)
|
||||
self.dropout = nn.Dropout(parent.residual_dropout)
|
||||
|
||||
# selfatt -> fastatt: performer!
|
||||
self.attn = SelfAttention(dim = parent.dim_model,
|
||||
heads = parent.num_heads,
|
||||
causal = False)
|
||||
|
||||
# METHODS ########################################################################################################
|
||||
|
||||
def forward(self, phone, mask=None):
|
||||
|
||||
# compute attention sub-layer
|
||||
phone = phone + (self.attn(self.norm(phone), mask=mask))
|
||||
|
||||
phone = phone + (self.conformer(phone))
|
||||
|
||||
return phone
|
||||
|
||||
def calc_same_padding(kernel_size):
|
||||
pad = kernel_size // 2
|
||||
return (pad, pad - (kernel_size + 1) % 2)
|
||||
|
||||
# helper classes
|
||||
|
||||
class Swish(nn.Module):
|
||||
def forward(self, x):
|
||||
return x * x.sigmoid()
|
||||
|
||||
class Transpose(nn.Module):
|
||||
def __init__(self, dims):
|
||||
super().__init__()
|
||||
assert len(dims) == 2, 'dims must be a tuple of two dimensions'
|
||||
self.dims = dims
|
||||
|
||||
def forward(self, x):
|
||||
return x.transpose(*self.dims)
|
||||
|
||||
class GLU(nn.Module):
|
||||
def __init__(self, dim):
|
||||
super().__init__()
|
||||
self.dim = dim
|
||||
|
||||
def forward(self, x):
|
||||
out, gate = x.chunk(2, dim=self.dim)
|
||||
return out * gate.sigmoid()
|
||||
|
||||
class DepthWiseConv1d(nn.Module):
|
||||
def __init__(self, chan_in, chan_out, kernel_size, padding):
|
||||
super().__init__()
|
||||
self.padding = padding
|
||||
self.conv = nn.Conv1d(chan_in, chan_out, kernel_size, groups = chan_in)
|
||||
|
||||
def forward(self, x):
|
||||
x = F.pad(x, self.padding)
|
||||
return self.conv(x)
|
||||
|
||||
class ConformerConvModule(nn.Module):
|
||||
def __init__(
|
||||
self,
|
||||
dim,
|
||||
causal = False,
|
||||
expansion_factor = 2,
|
||||
kernel_size = 31,
|
||||
dropout = 0.):
|
||||
super().__init__()
|
||||
|
||||
inner_dim = dim * expansion_factor
|
||||
padding = calc_same_padding(kernel_size) if not causal else (kernel_size - 1, 0)
|
||||
|
||||
self.net = nn.Sequential(
|
||||
nn.LayerNorm(dim),
|
||||
Transpose((1, 2)),
|
||||
nn.Conv1d(dim, inner_dim * 2, 1),
|
||||
GLU(dim=1),
|
||||
DepthWiseConv1d(inner_dim, inner_dim, kernel_size = kernel_size, padding = padding),
|
||||
#nn.BatchNorm1d(inner_dim) if not causal else nn.Identity(),
|
||||
Swish(),
|
||||
nn.Conv1d(inner_dim, dim, 1),
|
||||
Transpose((1, 2)),
|
||||
nn.Dropout(dropout)
|
||||
)
|
||||
|
||||
def forward(self, x):
|
||||
return self.net(x)
|
||||
|
||||
def linear_attention(q, k, v):
|
||||
if v is None:
|
||||
#print (k.size(), q.size())
|
||||
out = torch.einsum('...ed,...nd->...ne', k, q)
|
||||
return out
|
||||
|
||||
else:
|
||||
k_cumsum = k.sum(dim = -2)
|
||||
#k_cumsum = k.sum(dim = -2)
|
||||
D_inv = 1. / (torch.einsum('...nd,...d->...n', q, k_cumsum.type_as(q)) + 1e-8)
|
||||
|
||||
context = torch.einsum('...nd,...ne->...de', k, v)
|
||||
#print ("TRUEEE: ", context.size(), q.size(), D_inv.size())
|
||||
out = torch.einsum('...de,...nd,...n->...ne', context, q, D_inv)
|
||||
return out
|
||||
|
||||
def gaussian_orthogonal_random_matrix(nb_rows, nb_columns, scaling = 0, qr_uniform_q = False, device = None):
|
||||
nb_full_blocks = int(nb_rows / nb_columns)
|
||||
#print (nb_full_blocks)
|
||||
block_list = []
|
||||
|
||||
for _ in range(nb_full_blocks):
|
||||
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q = qr_uniform_q, device = device)
|
||||
block_list.append(q)
|
||||
# block_list[n] is a orthogonal matrix ... (model_dim * model_dim)
|
||||
#print (block_list[0].size(), torch.einsum('...nd,...nd->...n', block_list[0], torch.roll(block_list[0],1,1)))
|
||||
#print (nb_rows, nb_full_blocks, nb_columns)
|
||||
remaining_rows = nb_rows - nb_full_blocks * nb_columns
|
||||
#print (remaining_rows)
|
||||
if remaining_rows > 0:
|
||||
q = orthogonal_matrix_chunk(nb_columns, qr_uniform_q = qr_uniform_q, device = device)
|
||||
#print (q[:remaining_rows].size())
|
||||
block_list.append(q[:remaining_rows])
|
||||
|
||||
final_matrix = torch.cat(block_list)
|
||||
|
||||
if scaling == 0:
|
||||
multiplier = torch.randn((nb_rows, nb_columns), device = device).norm(dim = 1)
|
||||
elif scaling == 1:
|
||||
multiplier = math.sqrt((float(nb_columns))) * torch.ones((nb_rows,), device = device)
|
||||
else:
|
||||
raise ValueError(f'Invalid scaling {scaling}')
|
||||
|
||||
return torch.diag(multiplier) @ final_matrix
|
||||
|
||||
class FastAttention(nn.Module):
|
||||
def __init__(self, dim_heads, nb_features = None, ortho_scaling = 0, causal = False, generalized_attention = False, kernel_fn = nn.ReLU(), qr_uniform_q = False, no_projection = False):
|
||||
super().__init__()
|
||||
nb_features = default(nb_features, int(dim_heads * math.log(dim_heads)))
|
||||
|
||||
self.dim_heads = dim_heads
|
||||
self.nb_features = nb_features
|
||||
self.ortho_scaling = ortho_scaling
|
||||
|
||||
self.create_projection = partial(gaussian_orthogonal_random_matrix, nb_rows = self.nb_features, nb_columns = dim_heads, scaling = ortho_scaling, qr_uniform_q = qr_uniform_q)
|
||||
projection_matrix = self.create_projection()
|
||||
self.register_buffer('projection_matrix', projection_matrix)
|
||||
|
||||
self.generalized_attention = generalized_attention
|
||||
self.kernel_fn = kernel_fn
|
||||
|
||||
# if this is turned on, no projection will be used
|
||||
# queries and keys will be softmax-ed as in the original efficient attention paper
|
||||
self.no_projection = no_projection
|
||||
|
||||
self.causal = causal
|
||||
|
||||
@torch.no_grad()
|
||||
def redraw_projection_matrix(self):
|
||||
projections = self.create_projection()
|
||||
self.projection_matrix.copy_(projections)
|
||||
del projections
|
||||
|
||||
def forward(self, q, k, v):
|
||||
device = q.device
|
||||
|
||||
if self.no_projection:
|
||||
q = q.softmax(dim = -1)
|
||||
k = torch.exp(k) if self.causal else k.softmax(dim = -2)
|
||||
else:
|
||||
create_kernel = partial(softmax_kernel, projection_matrix = self.projection_matrix, device = device)
|
||||
|
||||
q = create_kernel(q, is_query = True)
|
||||
k = create_kernel(k, is_query = False)
|
||||
|
||||
attn_fn = linear_attention if not self.causal else self.causal_linear_fn
|
||||
if v is None:
|
||||
out = attn_fn(q, k, None)
|
||||
return out
|
||||
else:
|
||||
out = attn_fn(q, k, v)
|
||||
return out
|
||||
class SelfAttention(nn.Module):
|
||||
def __init__(self, dim, causal = False, heads = 8, dim_head = 64, local_heads = 0, local_window_size = 256, nb_features = None, feature_redraw_interval = 1000, generalized_attention = False, kernel_fn = nn.ReLU(), qr_uniform_q = False, dropout = 0., no_projection = False):
|
||||
super().__init__()
|
||||
assert dim % heads == 0, 'dimension must be divisible by number of heads'
|
||||
dim_head = default(dim_head, dim // heads)
|
||||
inner_dim = dim_head * heads
|
||||
self.fast_attention = FastAttention(dim_head, nb_features, causal = causal, generalized_attention = generalized_attention, kernel_fn = kernel_fn, qr_uniform_q = qr_uniform_q, no_projection = no_projection)
|
||||
|
||||
self.heads = heads
|
||||
self.global_heads = heads - local_heads
|
||||
self.local_attn = LocalAttention(window_size = local_window_size, causal = causal, autopad = True, dropout = dropout, look_forward = int(not causal), rel_pos_emb_config = (dim_head, local_heads)) if local_heads > 0 else None
|
||||
|
||||
#print (heads, nb_features, dim_head)
|
||||
#name_embedding = torch.zeros(110, heads, dim_head, dim_head)
|
||||
#self.name_embedding = nn.Parameter(name_embedding, requires_grad=True)
|
||||
|
||||
|
||||
self.to_q = nn.Linear(dim, inner_dim)
|
||||
self.to_k = nn.Linear(dim, inner_dim)
|
||||
self.to_v = nn.Linear(dim, inner_dim)
|
||||
self.to_out = nn.Linear(inner_dim, dim)
|
||||
self.dropout = nn.Dropout(dropout)
|
||||
|
||||
@torch.no_grad()
|
||||
def redraw_projection_matrix(self):
|
||||
self.fast_attention.redraw_projection_matrix()
|
||||
#torch.nn.init.zeros_(self.name_embedding)
|
||||
#print (torch.sum(self.name_embedding))
|
||||
def forward(self, x, context = None, mask = None, context_mask = None, name=None, inference=False, **kwargs):
|
||||
_, _, _, h, gh = *x.shape, self.heads, self.global_heads
|
||||
|
||||
cross_attend = exists(context)
|
||||
|
||||
context = default(context, x)
|
||||
context_mask = default(context_mask, mask) if not cross_attend else context_mask
|
||||
#print (torch.sum(self.name_embedding))
|
||||
q, k, v = self.to_q(x), self.to_k(context), self.to_v(context)
|
||||
|
||||
q, k, v = map(lambda t: rearrange(t, 'b n (h d) -> b h n d', h = h), (q, k, v))
|
||||
(q, lq), (k, lk), (v, lv) = map(lambda t: (t[:, :gh], t[:, gh:]), (q, k, v))
|
||||
|
||||
attn_outs = []
|
||||
#print (name)
|
||||
#print (self.name_embedding[name].size())
|
||||
if not empty(q):
|
||||
if exists(context_mask):
|
||||
global_mask = context_mask[:, None, :, None]
|
||||
v.masked_fill_(~global_mask, 0.)
|
||||
if cross_attend:
|
||||
pass
|
||||
#print (torch.sum(self.name_embedding))
|
||||
#out = self.fast_attention(q,self.name_embedding[name],None)
|
||||
#print (torch.sum(self.name_embedding[...,-1:]))
|
||||
else:
|
||||
out = self.fast_attention(q, k, v)
|
||||
attn_outs.append(out)
|
||||
|
||||
if not empty(lq):
|
||||
assert not cross_attend, 'local attention is not compatible with cross attention'
|
||||
out = self.local_attn(lq, lk, lv, input_mask = mask)
|
||||
attn_outs.append(out)
|
||||
|
||||
out = torch.cat(attn_outs, dim = 1)
|
||||
out = rearrange(out, 'b h n d -> b n (h d)')
|
||||
out = self.to_out(out)
|
||||
return self.dropout(out)
|
@ -10,8 +10,6 @@ from tqdm import tqdm
|
||||
|
||||
import diffusion.logger.utils as du
|
||||
|
||||
config_template = json.load(open("configs_template/config_template.json"))
|
||||
|
||||
pattern = re.compile(r'^[\.a-zA-Z0-9_\/]+$')
|
||||
|
||||
def get_wav_duration(file_path):
|
||||
@ -31,13 +29,16 @@ if __name__ == "__main__":
|
||||
parser.add_argument("--source_dir", type=str, default="./dataset/44k", help="path to source dir")
|
||||
parser.add_argument("--speech_encoder", type=str, default="vec768l12", help="choice a speech encoder|'vec768l12','vec256l9','hubertsoft','whisper-ppg','cnhubertlarge','dphubert','whisper-ppg-large','wavlmbase+'")
|
||||
parser.add_argument("--vol_aug", action="store_true", help="Whether to use volume embedding and volume augmentation")
|
||||
parser.add_argument("--tiny", action="store_true", help="Whether to train sovits tiny")
|
||||
args = parser.parse_args()
|
||||
|
||||
config_template = json.load(open("configs_template/config_tiny_template.json")) if args.tiny else json.load(open("configs_template/config_template.json"))
|
||||
train = []
|
||||
val = []
|
||||
idx = 0
|
||||
spk_dict = {}
|
||||
spk_id = 0
|
||||
|
||||
for speaker in tqdm(os.listdir(args.source_dir)):
|
||||
spk_dict[speaker] = spk_id
|
||||
spk_id += 1
|
||||
@ -98,6 +99,9 @@ if __name__ == "__main__":
|
||||
if args.vol_aug:
|
||||
config_template["train"]["vol_aug"] = config_template["model"]["vol_embedding"] = True
|
||||
|
||||
if args.tiny:
|
||||
config_template["model"]["filter_channels"] = 512
|
||||
|
||||
logger.info("Writing to configs/config.json")
|
||||
with open("configs/config.json", "w") as f:
|
||||
json.dump(config_template, f, indent=2)
|
||||
|
@ -28,11 +28,10 @@ hop_length = hps.data.hop_length
|
||||
speech_encoder = hps["model"]["speech_encoder"]
|
||||
|
||||
|
||||
def process_one(filename, hmodel,f0p,rank,diff=False,mel_extractor=None):
|
||||
def process_one(filename, hmodel, f0p, device, diff=False, mel_extractor=None):
|
||||
wav, sr = librosa.load(filename, sr=sampling_rate)
|
||||
audio_norm = torch.FloatTensor(wav)
|
||||
audio_norm = audio_norm.unsqueeze(0)
|
||||
device = torch.device(f"cuda:{rank}")
|
||||
soft_path = filename + ".soft.pt"
|
||||
if not os.path.exists(soft_path):
|
||||
wav16k = librosa.resample(wav, orig_sr=sampling_rate, target_sr=16000)
|
||||
@ -103,7 +102,8 @@ def process_one(filename, hmodel,f0p,rank,diff=False,mel_extractor=None):
|
||||
if not os.path.exists(aug_vol_path):
|
||||
np.save(aug_vol_path,aug_vol.to('cpu').numpy())
|
||||
|
||||
def process_batch(file_chunk, f0p, diff=False, mel_extractor=None):
|
||||
|
||||
def process_batch(file_chunk, f0p, diff=False, mel_extractor=None, device="cpu"):
|
||||
logger.info("Loading speech encoder for content...")
|
||||
rank = mp.current_process()._identity
|
||||
rank = rank[0] if len(rank) > 0 else 0
|
||||
@ -114,21 +114,22 @@ def process_batch(file_chunk, f0p, diff=False, mel_extractor=None):
|
||||
hmodel = utils.get_speech_encoder(speech_encoder, device=device)
|
||||
logger.info(f"Loaded speech encoder for rank {rank}")
|
||||
for filename in tqdm(file_chunk):
|
||||
process_one(filename, hmodel, f0p, gpu_id, diff, mel_extractor)
|
||||
process_one(filename, hmodel, f0p, device, diff, mel_extractor)
|
||||
|
||||
def parallel_process(filenames, num_processes, f0p, diff, mel_extractor):
|
||||
def parallel_process(filenames, num_processes, f0p, diff, mel_extractor, device):
|
||||
with ProcessPoolExecutor(max_workers=num_processes) as executor:
|
||||
tasks = []
|
||||
for i in range(num_processes):
|
||||
start = int(i * len(filenames) / num_processes)
|
||||
end = int((i + 1) * len(filenames) / num_processes)
|
||||
file_chunk = filenames[start:end]
|
||||
tasks.append(executor.submit(process_batch, file_chunk, f0p, diff, mel_extractor))
|
||||
tasks.append(executor.submit(process_batch, file_chunk, f0p, diff, mel_extractor, device=device))
|
||||
for task in tqdm(tasks):
|
||||
task.result()
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('-d', '--device', type=str, default=None)
|
||||
parser.add_argument(
|
||||
"--in_dir", type=str, default="dataset/44k", help="path to input dir"
|
||||
)
|
||||
@ -136,22 +137,27 @@ if __name__ == "__main__":
|
||||
'--use_diff',action='store_true', help='Whether to use the diffusion model'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--f0_predictor', type=str, default="dio", help='Select F0 predictor, can select crepe,pm,dio,harvest,rmvpe, default pm(note: crepe is original F0 using mean filter)'
|
||||
'--f0_predictor', type=str, default="dio", help='Select F0 predictor, can select crepe,pm,dio,harvest,rmvpe,fcpe|default: pm(note: crepe is original F0 using mean filter)'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--num_processes', type=int, default=1, help='You are advised to set the number of processes to the same as the number of CPU cores'
|
||||
)
|
||||
args = parser.parse_args()
|
||||
f0p = args.f0_predictor
|
||||
device = args.device
|
||||
if device is None:
|
||||
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
||||
|
||||
print(speech_encoder)
|
||||
logger.info("Using " + speech_encoder + " SpeechEncoder")
|
||||
logger.info("Using " + f0p + "f0 extractor")
|
||||
logger.info("Using diff Mode:")
|
||||
print(args.use_diff)
|
||||
logger.info("Using device: ", device)
|
||||
logger.info("Using SpeechEncoder: " + speech_encoder)
|
||||
logger.info("Using extractor: " + f0p)
|
||||
logger.info("Using diff Mode: " + str( args.use_diff))
|
||||
|
||||
if args.use_diff:
|
||||
print("use_diff")
|
||||
print("Loading Mel Extractor...")
|
||||
mel_extractor = Vocoder(dconfig.vocoder.type, dconfig.vocoder.ckpt, device = "cuda:0")
|
||||
mel_extractor = Vocoder(dconfig.vocoder.type, dconfig.vocoder.ckpt, device=device)
|
||||
print("Loaded Mel Extractor.")
|
||||
else:
|
||||
mel_extractor = None
|
||||
@ -162,5 +168,5 @@ if __name__ == "__main__":
|
||||
num_processes = args.num_processes
|
||||
if num_processes == 0:
|
||||
num_processes = os.cpu_count()
|
||||
|
||||
parallel_process(filenames, num_processes, f0p, args.use_diff, mel_extractor)
|
||||
|
||||
parallel_process(filenames, num_processes, f0p, args.use_diff, mel_extractor, device)
|
||||
|
@ -26,4 +26,6 @@ edge_tts
|
||||
langdetect
|
||||
pyyaml
|
||||
pynvml
|
||||
faiss-cpu
|
||||
faiss-cpu
|
||||
einops
|
||||
local_attention
|
3
utils.py
3
utils.py
@ -101,6 +101,9 @@ def get_f0_predictor(f0_predictor,hop_length,sampling_rate,**kargs):
|
||||
elif f0_predictor == "rmvpe":
|
||||
from modules.F0Predictor.RMVPEF0Predictor import RMVPEF0Predictor
|
||||
f0_predictor_object = RMVPEF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate,dtype=torch.float32 ,device=kargs["device"],threshold=kargs["threshold"])
|
||||
elif f0_predictor == "fcpe":
|
||||
from modules.F0Predictor.FCPEF0Predictor import FCPEF0Predictor
|
||||
f0_predictor_object = FCPEF0Predictor(hop_length=hop_length,sampling_rate=sampling_rate,dtype=torch.float32 ,device=kargs["device"],threshold=kargs["threshold"])
|
||||
else:
|
||||
raise Exception("Unknown f0 predictor")
|
||||
return f0_predictor_object
|
||||
|
Loading…
Reference in New Issue
Block a user