美文网首页
U-Net实现语音分离的代码分析

U-Net实现语音分离的代码分析

作者: 莹子说她想吃烤冷面 | 来源:发表于2019-10-15 11:10 被阅读0次

    不是自己对着论文复现的,是网上找来自己改的

    用到的包

    • Python 3.5
    • Chainer 3.0:一个柔性的神经网络框架,能够简单直观的写出复杂的网络。Chainer 对应地采用了一种叫做 “边定义边运行” 的机制, 即, 网络可以在实际进行前向计算的时候同时被定义。
    • librosa 0.5.0:python的音频处理库
    • cupy 2.0: 一个通过利用CUDA GPU库在Nvidia GPU上实现Numpy数组的库

    代码

    预处理:ProcessDSD.py

    数据集处理,将DSD100数据集的音频文件转换为时频声谱。
    DSD 包含两个文件夹,一个是混合音频的文件夹"Mixtures", 另一个是人声、鼓、贝司、其他乐器的分轨音频"Sources"。每个文件夹里包含两个子文件夹,"Dev" 是训练集,"Test"是测试集。

    import numpy as np
    from librosa.core import load
    import util
    import os
    
    PATH_DSD_SOURCE = ["DSD100/Sources/Dev", "DSD100/Sources/Test"]
    PATH_DSD_MIXTURE = ["DSD100/Mixtures/Dev", "DSD100/Mixtures/Test"]
    
    FILE_MIX = "mixture.wav"
    FILE_BASS = "bass.wav"
    FILE_DRUMS = "drums.wav"
    FILE_OTHER = "other.wav"
    FILE_VOCAL = "vocals.wav"
    
    
    list_source_dir = [os.path.join(PATH_DSD_SOURCE[0], f)
                       for f in os.listdir(PATH_DSD_SOURCE[0])]
    list_source_dir.extend([os.path.join(PATH_DSD_SOURCE[1], f)
                            for f in os.listdir(PATH_DSD_SOURCE[1])])
    list_source_dir = sorted(list_source_dir)
    
    list_mix_dir = [os.path.join(PATH_DSD_MIXTURE[0], f)
                    for f in os.listdir(PATH_DSD_MIXTURE[0])]
    list_mix_dir.extend([os.path.join(PATH_DSD_MIXTURE[1], f)
                         for f in os.listdir(PATH_DSD_MIXTURE[1])])
    list_mix_dir = sorted(list_mix_dir)
    
    
    for mix_dir, source_dir in zip(list_mix_dir,  list_source_dir):
        assert(mix_dir.split("/")[-1] == source_dir.split("/")[-1])
        fname = mix_dir.split("/")[-1]
        print("Processing: " + fname)
        y_mix, sr = load(os.path.join(mix_dir, FILE_MIX), sr=None)
        y_vocal, _ = load(os.path.join(source_dir, FILE_VOCAL), sr=None)
        y_inst = sum([load(os.path.join(source_dir, f), sr=None)[0]
                      for f in [FILE_DRUMS, FILE_BASS, FILE_OTHER]])
    
        assert(y_mix.shape == y_vocal.shape)
        assert(y_mix.shape == y_inst.shape)
    
        util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
    
    
    rand_voc = np.random.randint(100, size=50)
    rand_bass = np.random.randint(100, size=50)
    rand_drums = np.random.randint(100, size=50)
    rand_other = np.random.randint(100, size=50)
    
    count = 1
    print("Generating random mix...")
    for i_voc, i_bass, i_drums, i_other in \
            zip(rand_voc, rand_bass, rand_drums, rand_other):
        y_vocal, _ = load(os.path.join(list_source_dir[i_voc], FILE_VOCAL), sr=None)
        y_bass, _ = load(os.path.join(list_source_dir[i_bass], FILE_BASS), sr=None)
        y_drums, _ = load(os.path.join(list_source_dir[i_drums], FILE_DRUMS), sr=None)
        y_other, _ = load(os.path.join(list_source_dir[i_other], FILE_OTHER), sr=None)
    
        minsize = min([y_vocal.size, y_bass.size, y_drums.size, y_other.size])
    
        y_vocal = y_vocal[:minsize]
        y_inst = y_bass[:minsize] + y_drums[:minsize] + y_other[:minsize]
        y_mix = y_vocal + y_inst
    
        fname = "dsd_random%02d" % count
        util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)
        print("Saved:" + fname)
        count += 1
    

    主要用到util.SaveSpectrogram(y_mix, y_vocal, y_inst, fname)

    程序总入口:DoExperiment.py

    输入音频路径,训练模型或用现有模型,从原始音频获得分离的人声/分离的音频

    
    """
    Code example for training U-Net
    """
    import network
    
    Xlist,Ylist = util.LoadDataset(target="vocal")
    print("Dataset loaded.")
    network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30)
    
    
    """
    Code example for performing vocal separation with U-Net
    """
    import util
    
    fname = "Say Hello.mp3"
    mag, phase = util.LoadAudio(fname)
    start = 1024
    end = 1024+256
    
    mask = util.ComputeMask(mag[:, start:end], unet_model="/Users/yanyingzi/Study/Signal Seperation/UNet-VocalSeparation-Chainer-master/unet.model", hard=False)
    
    util.SaveAudio(
        "vocal-%s" % fname, mag[:, start:end]*mask, phase[:, start:end])
    util.SaveAudio(
        "inst-%s" % fname, mag[:, start:end]*(1-mask), phase[:, start:end])
    util.SaveAudio(
        "orig-%s" % fname, mag[:, start:end], phase[:, start:end])
    

    调用了五个主要接口:

    util.LoadDataset(target) 加载数据集,target就是要分离的目标,这里为“voice”
    network.TrainUNet(Xlist,Ylist,savefile="unet.model",epoch=30) 训练网络
    util.LoadAudio(fname) 加载音频
    util.ComputeMask(input_mag, unet_model="unet.model", hard=True)计算掩码
    util.SaveAudio(fname, mag, phase) 保存音频
    (如果有现成的模型,则只需要调用后三个函数)

    util.py

    def LoadDataset(target="vocal"): # 加载前面处理好的数据集的时频声谱
        filelist_fft = find_files(C.PATH_FFT, ext="npz")[:200]
        Xlist = []
        Ylist = []
        for file_fft in filelist_fft:
            dat = np.load(file_fft)
            Xlist.append(dat["mix"])
            if target == "vocal":
                assert(dat["mix"].shape == dat["vocal"].shape)
                Ylist.append(dat["vocal"])
            else:
                assert(dat["mix"].shape == dat["inst"].shape)
                Ylist.append(dat["inst"])
        return Xlist, Ylist
    

    函数返回的Xlist是混合音频的声谱,Ylist是分轨的target音频的声谱。
    find_fileslibrosa.util.find_files

    def network.TrainUNet(): # 见后面network.py
    
    def LoadAudio(fname):
        y, sr = load(fname, sr=C.SR) #sr: sampling rate, C.SR = 16000
        spec = stft(y, n_fft=C.FFT_SIZE, hop_length=C.H, win_length=C.FFT_SIZE) # C.FFT_SIZE = 1024,C.H = 512
        mag = np.abs(spec)
        mag /= np.max(mag) # 标准化
        phase = np.exp(1.j*np.angle(spec))
        return mag, phase
    

    loadlibrosa.core.load, 加载各种格式的音频

    • 参数为:
      path:音频路径
      sr:音频频率(可以不用原始的音频频率,他有重采样的功能)
      mono:该值为true时候是单通道、否则为双通道
      offset:读音频的开始时间,也就是可以不从头开始读取音频
      duration:持续时间,可以不加载全部时间,通过与offset合作读取其中一段音频
      dtype:返回的音频信号值的数据格式,一般不设置
      res_type:重采样的格式,一般不用
    • 返回值:
      y:音频的信号值,是个numpy一维数组
      sr:音频的采样值,如果参数没有设置返回的是原始采样率

    stftlibrosa.core.stft,短时傅里叶变化

    • 参数为:
      y:信号值
      n_fft: 每个傅里叶窗口包含的样本量(建议为2的指数)
      hop_length: 步长,每帧之间的采样数
      win_length: 窗长,小于等于n_fft(默认 win_length = n_fft,使用整帧)
      window: 窗类型,汉明窗等
      center: 特征是信号的中心还是起始点
      dtype: 返回的音频信号值的数据格式,一般不设置
      pad_model: 对于边缘进行pad
    • 返回值:
      D:短时傅里叶变化后的矩阵(时频声谱)D.shape=(number of frequency bins , number of time frames)=(1 + n_fft/2, n_frames)

    (没想通为什么 number of frequency bins = 1 + n_fft/2 )
    网上的解释:The continuous Fourier transform possesses symmetries when computed on real signals (Hermitian symmetry). The discrete version, an FFT (of even length) possesses a slighty twisted symmetry.就是说FFT计算出来的结果是频率上对称的,存在"duplicated" in positive and negative frequencies.)
    (phase也看不懂)

    mag 是magnitude,做了绝对值
    phase = np.exp(1.j*np.angle(spec)) 返回spec的复数角度

    def SaveAudio(fname, mag, phase):
        y = istft(mag*phase, hop_length=C.H, win_length=C.FFT_SIZE)
        write_wav(fname, y, C.SR, norm=True)
    

    istftlibrosa.core.istft
    write_wavlibrosa.output.write_wav

    def SaveSpectrogram(y_mix, y_vocal, y_inst, fname, original_sr=44100):
        y_mix = resample(y_mix, original_sr, C.SR)
        y_vocal = resample(y_vocal, original_sr, C.SR)
        y_inst = resample(y_inst, original_sr, C.SR)
    
        S_mix = np.abs(
            stft(y_mix, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
        S_vocal = np.abs(
            stft(y_vocal, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
        S_inst = np.abs(
            stft(y_inst, n_fft=C.FFT_SIZE, hop_length=C.H)).astype(np.float32)
    
        norm = S_mix.max()
        S_mix /= norm
        S_vocal /= norm
        S_inst /= norm
    
        np.savez(os.path.join(C.PATH_FFT, fname+".npz"),
                 mix=S_mix, vocal=S_vocal, inst=S_inst)
    

    stftlibrosa.core.stft,短时傅里叶变化。SaveSpectrogram这个函数是处理数据集时用的,将数据集里的音频转化为声谱。

    def ComputeMask(input_mag, unet_model="unet.model", hard=True):
        unet = network.UNet()
        unet.load(unet_model)
        config.train = False
        config.enable_backprop = False
        mask = unet(input_mag[np.newaxis, np.newaxis, 1:, :]).data[0, 0, :, :]
        mask = np.vstack((np.zeros(mask.shape[1], dtype="float32"), mask))
        if hard:  # hard mask
            hard_mask = np.zeros(mask.shape, dtype="float32")
            hard_mask[mask > 0.5] = 1
            return hard_mask
        else:    # soft mask
            return mask
    

    network.UNet() 调用U-Net神经网络,用训练好的模型来计算 hard mask 或 soft mask。

    network.np

    UNet class

    from chainer import Chain, serializers, optimizers, cuda, config
    import chainer.links as L
    import chainer.functions as F
    import numpy as np
    #import const
    
    #cp = cuda.cupy
    
    class UNet(Chain):
        def __init__(self):
            super(UNet, self).__init__()
            with self.init_scope():
                self.conv1 = L.Convolution2D(1, 16, 4, 2, 1)
                self.norm1 = L.BatchNormalization(16)
                self.conv2 = L.Convolution2D(16, 32, 4, 2, 1)
                self.norm2 = L.BatchNormalization(32)
                self.conv3 = L.Convolution2D(32, 64, 4, 2, 1)
                self.norm3 = L.BatchNormalization(64)
                self.conv4 = L.Convolution2D(64, 128, 4, 2, 1)
                self.norm4 = L.BatchNormalization(128)
                self.conv5 = L.Convolution2D(128, 256, 4, 2, 1)
                self.norm5 = L.BatchNormalization(256)
                self.conv6 = L.Convolution2D(256, 512, 4, 2, 1)
                self.norm6 = L.BatchNormalization(512)
                self.deconv1 = L.Deconvolution2D(512, 256, 4, 2, 1)
                self.denorm1 = L.BatchNormalization(256)
                self.deconv2 = L.Deconvolution2D(512, 128, 4, 2, 1)
                self.denorm2 = L.BatchNormalization(128)
                self.deconv3 = L.Deconvolution2D(256, 64, 4, 2, 1)
                self.denorm3 = L.BatchNormalization(64)
                self.deconv4 = L.Deconvolution2D(128, 32, 4, 2, 1)
                self.denorm4 = L.BatchNormalization(32)
                self.deconv5 = L.Deconvolution2D(64, 16, 4, 2, 1)
                self.denorm5 = L.BatchNormalization(16)
                self.deconv6 = L.Deconvolution2D(32, 1, 4, 2, 1)
    
        def __call__(self, X):
    
            print X.shape
    
            h1 = F.leaky_relu(self.norm1(self.conv1(X)))
            print h1.shape
            h2 = F.leaky_relu(self.norm2(self.conv2(h1)))
            print h2.shape
            h3 = F.leaky_relu(self.norm3(self.conv3(h2)))
            print h3.shape
            h4 = F.leaky_relu(self.norm4(self.conv4(h3)))
            print h4.shape
            h5 = F.leaky_relu(self.norm5(self.conv5(h4)))
            print h5.shape
            h6 = F.leaky_relu(self.norm6(self.conv6(h5)))
            print h6.shape
            dh = F.relu(F.dropout(self.denorm1(self.deconv1(h6))))
            print dh.shape
            dh = F.relu(F.dropout(self.denorm2(self.deconv2(F.concat((dh, h5))))))
            print dh.shape
            dh = F.relu(F.dropout(self.denorm3(self.deconv3(F.concat((dh, h4))))))
            print dh.shape
            dh = F.relu(self.denorm4(self.deconv4(F.concat((dh, h3)))))
            print dh.shape
            dh = F.relu(self.denorm5(self.deconv5(F.concat((dh, h2)))))
            print dh.shape
            dh = F.sigmoid(self.deconv6(F.concat((dh, h1))))
            print dh.shape
            return dh
    
        def load(self, fname="unet.model"):
            serializers.load_npz(fname, self)
    
        def save(self, fname="unet.model"):
            serializers.save_npz(fname, self)
    
    

    TrainUNet:

    class UNetTrainmodel(Chain):
        def __init__(self, unet):
            super(UNetTrainmodel, self).__init__()
            with self.init_scope():
                self.unet = unet
    
        def __call__(self, X, Y):
            O = self.unet(X)
            self.loss = F.mean_absolute_error(X*O, Y)
            return self.loss
    
    
    def TrainUNet(Xlist, Ylist, epoch=40, savefile="unet.model"):
        assert(len(Xlist) == len(Ylist))
        unet = UNet()
        model = UNetTrainmodel(unet)
        model.to_gpu(0)
        opt = optimizers.Adam()
        opt.setup(model)
        config.train = True
        config.enable_backprop = True
        itemcnt = len(Xlist)
        itemlength = [x.shape[1] for x in Xlist]
        subepoch = sum(itemlength) // const.PATCH_LENGTH // const.BATCH_SIZE * 4
        for ep in range(epoch):
            sum_loss = 0.0
            for subep in range(subepoch):
                X = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
                             dtype="float32")
                Y = np.zeros((const.BATCH_SIZE, 1, 512, const.PATCH_LENGTH),
                             dtype="float32")
                idx_item = np.random.randint(0, itemcnt, const.BATCH_SIZE)
                for i in range(const.BATCH_SIZE):
                    randidx = np.random.randint(
                        itemlength[idx_item[i]]-const.PATCH_LENGTH-1)
                    X[i, 0, :, :] = \
                        Xlist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
                    Y[i, 0, :, :] = \
                        Ylist[idx_item[i]][1:, randidx:randidx+const.PATCH_LENGTH]
                opt.update(model, cp.asarray(X), cp.asarray(Y))
                sum_loss += model.loss.data * const.BATCH_SIZE
    
            print("epoch: %d/%d  loss=%.3f" % (ep+1, epoch, sum_loss))
    
        unet.save(savefile)
    

    总结

    U-Net其实就是输入空间与输出空间相同的一个映射。在训练时,输入是(MixSpec, TargetSpec) = (混合声谱,目标声谱),用Adam方法优化参数θ来得到一个mask = UNet_θ(MixSpec),MixSpec*mask使得尽可能逼近TargetSpec。

    相关文章

      网友评论

          本文标题:U-Net实现语音分离的代码分析

          本文链接:https://www.haomeiwen.com/subject/cjqdmctx.html