2021-12-22 20:44:11 +00:00
|
|
|
import torch
|
|
|
|
import torch.nn as nn
|
|
|
|
import torch.nn.functional as F
|
|
|
|
from einops import rearrange
|
|
|
|
from torch import einsum
|
|
|
|
|
|
|
|
from models.lucidrains.dalle.transformer import Transformer
|
|
|
|
from trainer.networks import register_model
|
|
|
|
from utils.util import opt_get
|
|
|
|
|
|
|
|
|
|
|
|
def exists(val):
|
|
|
|
return val is not None
|
|
|
|
|
|
|
|
|
|
|
|
def masked_mean(t, mask, dim = 1):
|
|
|
|
t = t.masked_fill(~mask[:, :, None], 0.)
|
|
|
|
return t.sum(dim = 1) / mask.sum(dim = 1)[..., None]
|
|
|
|
|
|
|
|
|
|
|
|
class VoiceCLIP(nn.Module):
|
|
|
|
"""
|
|
|
|
CLIP model retrofitted for performing contrastive evaluation between tokenized audio data and the corresponding
|
|
|
|
transcribed text.
|
|
|
|
|
|
|
|
Originally from https://github.com/lucidrains/DALLE-pytorch/blob/main/dalle_pytorch/dalle_pytorch.py
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
*,
|
|
|
|
dim_text=512,
|
|
|
|
dim_speech=512,
|
|
|
|
dim_latent=512,
|
2022-01-08 05:28:45 +00:00
|
|
|
num_text_tokens=256,
|
2021-12-22 20:44:11 +00:00
|
|
|
text_enc_depth=6,
|
2022-01-08 05:28:45 +00:00
|
|
|
text_seq_len=120,
|
2021-12-22 20:44:11 +00:00
|
|
|
text_heads=8,
|
|
|
|
num_speech_tokens=8192,
|
|
|
|
speech_enc_depth=6,
|
|
|
|
speech_heads=8,
|
|
|
|
speech_seq_len=250,
|
2022-01-08 05:28:45 +00:00
|
|
|
text_mask_percentage: 0,
|
|
|
|
wav_token_compression = 1024,
|
2021-12-22 20:44:11 +00:00
|
|
|
):
|
|
|
|
super().__init__()
|
|
|
|
self.text_emb = nn.Embedding(num_text_tokens, dim_text)
|
|
|
|
self.text_pos_emb = nn.Embedding(text_seq_len, dim_text)
|
|
|
|
self.text_transformer = Transformer(causal=False, seq_len=text_seq_len, dim=dim_text, depth=text_enc_depth,
|
|
|
|
heads=text_heads, rotary_emb=False)
|
|
|
|
self.to_text_latent = nn.Linear(dim_text, dim_latent, bias=False)
|
|
|
|
|
|
|
|
self.speech_emb = nn.Embedding(num_speech_tokens, dim_speech)
|
|
|
|
self.speech_pos_emb = nn.Embedding(num_speech_tokens, dim_speech)
|
|
|
|
self.speech_transformer = Transformer(causal=False, seq_len=speech_seq_len, dim=dim_speech,
|
|
|
|
depth=speech_enc_depth, heads=speech_heads, rotary_emb=False)
|
|
|
|
self.to_speech_latent = nn.Linear(dim_speech, dim_latent, bias=False)
|
|
|
|
|
|
|
|
self.temperature = nn.Parameter(torch.tensor(1.))
|
2022-01-08 05:28:45 +00:00
|
|
|
self.text_mask_percentage = text_mask_percentage
|
|
|
|
self.wav_token_compression = wav_token_compression
|
2021-12-22 20:44:11 +00:00
|
|
|
|
|
|
|
def forward(
|
|
|
|
self,
|
|
|
|
text,
|
2022-01-08 05:28:45 +00:00
|
|
|
text_lengths,
|
2021-12-22 20:44:11 +00:00
|
|
|
speech_tokens,
|
2022-01-08 05:28:45 +00:00
|
|
|
wav_lengths,
|
2021-12-22 20:44:11 +00:00
|
|
|
return_loss=False
|
|
|
|
):
|
2022-01-08 05:28:45 +00:00
|
|
|
# This model will receive micro-batches with a ton of padding for both the text and MELs. Ameliorate this by
|
|
|
|
# chopping the inputs by the maximum actual length.
|
|
|
|
max_text_len = text_lengths.max()
|
|
|
|
text = text[:, :max_text_len]
|
|
|
|
max_mel_len = wav_lengths.max() // self.wav_token_compression
|
|
|
|
speech_tokens = speech_tokens[:, :max_mel_len]
|
|
|
|
|
2021-12-22 20:44:11 +00:00
|
|
|
b, device = text.shape[0], text.device
|
2022-01-08 15:55:00 +00:00
|
|
|
text_mask = torch.rand_like(text.float()) > self.text_mask_percentage
|
2021-12-22 20:44:11 +00:00
|
|
|
|
|
|
|
text_emb = self.text_emb(text)
|
|
|
|
text_emb += self.text_pos_emb(torch.arange(text.shape[1], device=device))
|
|
|
|
|
|
|
|
speech_emb = self.speech_emb(speech_tokens)
|
|
|
|
speech_emb += self.speech_pos_emb(torch.arange(speech_emb.shape[1], device=device))
|
|
|
|
|
|
|
|
enc_text = self.text_transformer(text_emb, mask=text_mask)
|
|
|
|
enc_speech = self.speech_transformer(speech_emb)
|
|
|
|
|
2022-01-08 05:28:45 +00:00
|
|
|
if self.text_mask_percentage > 0:
|
2021-12-22 20:44:11 +00:00
|
|
|
text_latents = masked_mean(enc_text, text_mask, dim=1)
|
|
|
|
else:
|
|
|
|
text_latents = enc_text.mean(dim=1)
|
|
|
|
|
|
|
|
speech_latents = enc_speech.mean(dim=1)
|
|
|
|
|
|
|
|
text_latents = self.to_text_latent(text_latents)
|
|
|
|
speech_latents = self.to_speech_latent(speech_latents)
|
|
|
|
|
|
|
|
text_latents, speech_latents = map(lambda t: F.normalize(t, p=2, dim=-1), (text_latents, speech_latents))
|
|
|
|
|
|
|
|
temp = self.temperature.exp()
|
|
|
|
|
|
|
|
if not return_loss:
|
|
|
|
sim = einsum('n d, n d -> n', text_latents, speech_latents) * temp
|
|
|
|
return sim
|
|
|
|
|
|
|
|
sim = einsum('i d, j d -> i j', text_latents, speech_latents) * temp
|
|
|
|
labels = torch.arange(b, device=device)
|
|
|
|
loss = (F.cross_entropy(sim, labels) + F.cross_entropy(sim.t(), labels)) / 2
|
|
|
|
return loss
|
|
|
|
|
|
|
|
|
|
|
|
@register_model
|
|
|
|
def register_voice_clip(opt_net, opt):
|
|
|
|
return VoiceCLIP(**opt_get(opt_net, ['kwargs'], {}))
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
2022-01-08 05:28:45 +00:00
|
|
|
clip = VoiceCLIP(text_mask_percentage=.2)
|
|
|
|
clip(torch.randint(0,256,(2,120)),
|
2021-12-22 20:44:11 +00:00
|
|
|
torch.randint(0,8192,(2,250)),
|
|
|
|
return_loss=True)
|