# Source: https://github.com/SeanNaren/deepspeech.pytorch import math import torch import torch.nn as nn import torch.nn.functional as F from torchaudio.functional import magphase from data.audio.unsupervised_audio_dataset import load_audio from models.deepspeech.decoder import GreedyDecoder from trainer.networks import register_model class SequenceWise(nn.Module): def __init__(self, module): """ Collapses input of dim T*N*H to (T*N)*H, and applies to a module. Allows handling of variable sequence lengths and minibatch sizes. :param module: Module to apply input to. """ super(SequenceWise, self).__init__() self.module = module def forward(self, x): t, n = x.size(0), x.size(1) x = x.view(t * n, -1) x = self.module(x) x = x.view(t, n, -1) return x def __repr__(self): tmpstr = self.__class__.__name__ + ' (\n' tmpstr += self.module.__repr__() tmpstr += ')' return tmpstr class MaskConv(nn.Module): def __init__(self, seq_module): """ Adds padding to the output of the module based on the given lengths. This is to ensure that the results of the model do not change when batch sizes change during inference. Input needs to be in the shape of (BxCxDxT) :param seq_module: The sequential module containing the conv stack. """ super(MaskConv, self).__init__() self.seq_module = seq_module def forward(self, x, lengths): """ :param x: The input of size BxCxDxT :param lengths: The actual length of each sequence in the batch :return: Masked output from the module """ for module in self.seq_module: x = module(x) mask = torch.BoolTensor(x.size()).fill_(0) if x.is_cuda: mask = mask.cuda() for i, length in enumerate(lengths): length = length.item() if (mask[i].size(2) - length) > 0: mask[i].narrow(2, length, mask[i].size(2) - length).fill_(1) x = x.masked_fill(mask, 0) return x, lengths class InferenceBatchSoftmax(nn.Module): def forward(self, input_): if not self.training: return F.softmax(input_, dim=-1) else: return input_ class BatchRNN(nn.Module): def __init__(self, input_size, hidden_size, rnn_type=nn.LSTM, bidirectional=False, batch_norm=True): super(BatchRNN, self).__init__() self.input_size = input_size self.hidden_size = hidden_size self.bidirectional = bidirectional self.batch_norm = SequenceWise(nn.BatchNorm1d(input_size)) if batch_norm else None self.rnn = rnn_type(input_size=input_size, hidden_size=hidden_size, bidirectional=bidirectional, bias=True) self.num_directions = 2 if bidirectional else 1 def flatten_parameters(self): self.rnn.flatten_parameters() def forward(self, x, output_lengths): if self.batch_norm is not None: x = self.batch_norm(x) x = nn.utils.rnn.pack_padded_sequence(x, output_lengths) x, h = self.rnn(x) x, _ = nn.utils.rnn.pad_packed_sequence(x) if self.bidirectional: x = x.view(x.size(0), x.size(1), 2, -1).sum(2).view(x.size(0), x.size(1), -1) # (TxNxH*2) -> (TxNxH) by sum return x class Lookahead(nn.Module): # Wang et al 2016 - Lookahead Convolution Layer for Unidirectional Recurrent Neural Networks # input shape - sequence, batch, feature - TxNxH # output shape - same as input def __init__(self, n_features, context): super(Lookahead, self).__init__() assert context > 0 self.context = context self.n_features = n_features self.pad = (0, self.context - 1) self.conv = nn.Conv1d( self.n_features, self.n_features, kernel_size=self.context, stride=1, groups=self.n_features, padding=0, bias=False ) def forward(self, x): x = x.transpose(0, 1).transpose(1, 2) x = F.pad(x, pad=self.pad, value=0) x = self.conv(x) x = x.transpose(1, 2).transpose(0, 1).contiguous() return x def __repr__(self): return self.__class__.__name__ + '(' \ + 'n_features=' + str(self.n_features) \ + ', context=' + str(self.context) + ')' class DeepSpeech(nn.Module): def __init__(self, hidden_size: int = 1024, hidden_layers: int = 5, lookahead_context: int = 20, bidirectional: bool = True, sample_rate: int = 16000, window_size: int = .02, window_stride: int = .01 ): super().__init__() self.bidirectional = bidirectional self.sample_rate = sample_rate self.window_size = window_size self.window_stride = window_stride self.labels = [ "_", "'", "A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K", "L", "M", "N", "O", "P", "Q", "R", "S", "T", "U", "V", "W", "X", "Y", "Z", " " ] num_classes = len(self.labels) self.conv = MaskConv(nn.Sequential( nn.Conv2d(1, 32, kernel_size=(41, 11), stride=(2, 2), padding=(20, 5)), nn.BatchNorm2d(32), nn.Hardtanh(0, 20, inplace=True), nn.Conv2d(32, 32, kernel_size=(21, 11), stride=(2, 1), padding=(10, 5)), nn.BatchNorm2d(32), nn.Hardtanh(0, 20, inplace=True) )) # Based on above convolutions and spectrogram size using conv formula (W - F + 2P)/ S+1 rnn_input_size = int(math.floor((sample_rate * window_size) / 2) + 1) rnn_input_size = int(math.floor(rnn_input_size + 2 * 20 - 41) / 2 + 1) rnn_input_size = int(math.floor(rnn_input_size + 2 * 10 - 21) / 2 + 1) rnn_input_size *= 32 self.rnns = nn.Sequential( BatchRNN( input_size=rnn_input_size, hidden_size=hidden_size, rnn_type=nn.LSTM, bidirectional=self.bidirectional, batch_norm=False ), *( BatchRNN( input_size=hidden_size, hidden_size=hidden_size, rnn_type=nn.LSTM, bidirectional=self.bidirectional ) for x in range(hidden_layers - 1) ) ) self.lookahead = nn.Sequential( # consider adding batch norm? Lookahead(hidden_size, context=lookahead_context), nn.Hardtanh(0, 20, inplace=True) ) if not self.bidirectional else None fully_connected = nn.Sequential( nn.BatchNorm1d(hidden_size), nn.Linear(hidden_size, num_classes, bias=False) ) self.fc = nn.Sequential( SequenceWise(fully_connected), ) self.inference_softmax = InferenceBatchSoftmax() self.evaluation_decoder = GreedyDecoder(self.labels) # Decoder used for inference. def forward(self, wav, lengths=None): if lengths is None: lengths = torch.tensor([wav.shape[-1] for _ in range(wav.shape[0])], dtype=torch.int32, device=wav.device) x = self.audio_to_spectrogram(wav) lengths = (lengths // math.ceil(wav.shape[-1] / x.shape[-1])).cpu().int() # 160 is the spectrogram compression output_lengths = self.get_seq_lens(lengths) x, _ = self.conv(x, output_lengths) sizes = x.size() x = x.view(sizes[0], sizes[1] * sizes[2], sizes[3]) # Collapse feature dimension x = x.transpose(1, 2).transpose(0, 1).contiguous() # TxNxH for rnn in self.rnns: x = rnn(x, output_lengths) if not self.bidirectional: # no need for lookahead layer in bidirectional x = self.lookahead(x) x = self.fc(x) x = x.transpose(0, 1) #x = self.inference_softmax(x) <-- doesn't work? return x, output_lengths def infer(self, inputs, lengths=None): out, output_sizes = self(inputs, lengths) decoded_output, _ = self.evaluation_decoder.decode(out, output_sizes) return decoded_output def get_seq_lens(self, input_length): """ Given a 1D Tensor or Variable containing integer sequence lengths, return a 1D tensor or variable containing the size sequences that will be output by the network. :param input_length: 1D Tensor :return: 1D Tensor scaled by model """ seq_len = input_length for m in self.conv.modules(): if type(m) == nn.modules.conv.Conv2d: seq_len = ((seq_len + 2 * m.padding[1] - m.dilation[1] * (m.kernel_size[1] - 1) - 1) // m.stride[1] + 1) return seq_len.int() def audio_to_spectrogram(self, y): if len(y.shape) == 3: assert y.shape[1] == 1 y = y.squeeze(1) n_fft = int(self.sample_rate * self.window_size) win_length = n_fft hop_length = int(self.sample_rate * self.window_stride) # STFT D = torch.stft(y, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=torch.hamming_window(win_length, device=y.device)) spect, phase = magphase(D) # S = log(S+1) spect = torch.log1p(spect) return spect.unsqueeze(1) # Deepspeech operates in a 2D spectrogram regime. @register_model def register_deepspeech(opt_net, opt): return DeepSpeech(**opt_net['kwargs']) # Test for ~4 second audio clip at 22050Hz if __name__ == '__main__': clip = load_audio('D:\\data\\audio\\libritts\\test-clean\\1089\\134686\\1089_134686_000008_000000.wav', 16000).cuda() model = DeepSpeech().cuda() model.eval() sd = torch.load('\\\\192.168.5.3\\rtx3080_drv\\deepspeech.pytorch\\checkpoint_sd.pth') with torch.no_grad(): model.load_state_dict(sd) print(model(clip)[0].shape) print(model.infer(clip))