lyuxiang.lx 1 yıl önce
ebeveyn
işleme
0bf706c26f
1 değiştirilmiş dosya ile 322 ekleme ve 0 silme
  1. 322 0
      cosyvoice/transformer/upsample_encoder.py

+ 322 - 0
cosyvoice/transformer/upsample_encoder.py

@@ -0,0 +1,322 @@
+# Copyright (c) 2021 Mobvoi Inc (Binbin Zhang, Di Wu)
+#               2022 Xingchen Song (sxc19@mails.tsinghua.edu.cn)
+#               2024 Alibaba Inc (Xiang Lyu)
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# Modified from ESPnet(https://github.com/espnet/espnet)
+"""Encoder definition."""
+from typing import Tuple
+
+import torch
+from torch import nn
+import torch.utils.checkpoint as ckpt
+from torch.nn import functional as F
+
+from cosyvoice.transformer.convolution import ConvolutionModule
+from cosyvoice.transformer.encoder_layer import ConformerEncoderLayer
+from cosyvoice.transformer.positionwise_feed_forward import PositionwiseFeedForward
+from cosyvoice.utils.class_utils import (
+    COSYVOICE_EMB_CLASSES,
+    COSYVOICE_SUBSAMPLE_CLASSES,
+    COSYVOICE_ATTENTION_CLASSES,
+    COSYVOICE_ACTIVATION_CLASSES,
+)
+from cosyvoice.utils.mask import make_pad_mask
+from cosyvoice.utils.mask import add_optional_chunk_mask
+
+
+class Upsample1D(nn.Module):
+    """A 1D upsampling layer with an optional convolution.
+
+    Parameters:
+        channels (`int`):
+            number of channels in the inputs and outputs.
+        use_conv (`bool`, default `False`):
+            option to use a convolution.
+        use_conv_transpose (`bool`, default `False`):
+            option to use a convolution transpose.
+        out_channels (`int`, optional):
+            number of output channels. Defaults to `channels`.
+    """
+
+    def __init__(self, channels: int, out_channels: int, stride: int=2):
+        super().__init__()
+        self.channels = channels
+        self.out_channels = out_channels
+        self.stride = stride
+        # In this mode, first repeat interpolate, than conv with stride=1
+        self.conv = nn.Conv1d(
+            self.channels, self.out_channels, stride*2+1, stride=1,
+            padding=0,
+        )
+
+    def forward(self, inputs: torch.Tensor, input_lengths: torch.Tensor):
+        outputs = F.interpolate(inputs, scale_factor=float(self.stride), mode="nearest")
+        outputs = F.pad(outputs, (self.stride * 2, 0), value=0.0)
+        outputs = self.conv(outputs)
+        return outputs, input_lengths * self.stride
+
+
+class PreLookaheadLayer(nn.Module):
+    def __init__(self, channels: int, pre_lookahead_len: int = 1):
+        super().__init__()
+        self.channels = channels
+        self.pre_lookahead_len = pre_lookahead_len
+        self.conv1 = nn.Conv1d(
+            channels, channels,
+            kernel_size=pre_lookahead_len+1,
+            stride=1, padding=0,
+        )
+        self.conv2 = nn.Conv1d(
+            channels, channels,
+            kernel_size=3, stride=1, padding=0,
+        )
+
+    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
+        """
+        inputs: (batch_size, seq_len, channels)
+        """
+        outputs = inputs.transpose(1, 2).contiguous()
+        # look ahead
+        outputs = F.pad(outputs, (0, self.pre_lookahead_len), mode='constant', value=0.0)
+        outputs = F.leaky_relu(self.conv1(outputs))
+        # outputs
+        outputs = F.pad(outputs, (2, 0), mode='constant', value=0.0)
+        outputs = self.conv2(outputs)
+        outputs = outputs.transpose(1, 2).contiguous()
+
+        # residual connection
+        outputs = outputs + inputs
+        return outputs
+
+
+class UpsampleConformerEncoder(torch.nn.Module):
+
+    def __init__(
+        self,
+        input_size: int,
+        output_size: int = 256,
+        attention_heads: int = 4,
+        linear_units: int = 2048,
+        num_blocks: int = 6,
+        dropout_rate: float = 0.1,
+        positional_dropout_rate: float = 0.1,
+        attention_dropout_rate: float = 0.0,
+        input_layer: str = "conv2d",
+        pos_enc_layer_type: str = "rel_pos",
+        normalize_before: bool = True,
+        static_chunk_size: int = 0,
+        use_dynamic_chunk: bool = False,
+        global_cmvn: torch.nn.Module = None,
+        use_dynamic_left_chunk: bool = False,
+        positionwise_conv_kernel_size: int = 1,
+        macaron_style: bool = True,
+        selfattention_layer_type: str = "rel_selfattn",
+        activation_type: str = "swish",
+        use_cnn_module: bool = True,
+        cnn_module_kernel: int = 15,
+        causal: bool = False,
+        cnn_module_norm: str = "batch_norm",
+        key_bias: bool = True,
+        gradient_checkpointing: bool = False,
+    ):
+        """
+        Args:
+            input_size (int): input dim
+            output_size (int): dimension of attention
+            attention_heads (int): the number of heads of multi head attention
+            linear_units (int): the hidden units number of position-wise feed
+                forward
+            num_blocks (int): the number of decoder blocks
+            dropout_rate (float): dropout rate
+            attention_dropout_rate (float): dropout rate in attention
+            positional_dropout_rate (float): dropout rate after adding
+                positional encoding
+            input_layer (str): input layer type.
+                optional [linear, conv2d, conv2d6, conv2d8]
+            pos_enc_layer_type (str): Encoder positional encoding layer type.
+                opitonal [abs_pos, scaled_abs_pos, rel_pos, no_pos]
+            normalize_before (bool):
+                True: use layer_norm before each sub-block of a layer.
+                False: use layer_norm after each sub-block of a layer.
+            static_chunk_size (int): chunk size for static chunk training and
+                decoding
+            use_dynamic_chunk (bool): whether use dynamic chunk size for
+                training or not, You can only use fixed chunk(chunk_size > 0)
+                or dyanmic chunk size(use_dynamic_chunk = True)
+            global_cmvn (Optional[torch.nn.Module]): Optional GlobalCMVN module
+            use_dynamic_left_chunk (bool): whether use dynamic left chunk in
+                dynamic chunk training
+            key_bias: whether use bias in attention.linear_k, False for whisper models.
+            gradient_checkpointing: rerunning a forward-pass segment for each
+                checkpointed segment during backward.
+        """
+        super().__init__()
+        self._output_size = output_size
+
+        self.global_cmvn = global_cmvn
+        self.embed = COSYVOICE_SUBSAMPLE_CLASSES[input_layer](
+            input_size,
+            output_size,
+            dropout_rate,
+            COSYVOICE_EMB_CLASSES[pos_enc_layer_type](output_size,
+                                                      positional_dropout_rate),
+        )
+
+        self.normalize_before = normalize_before
+        self.after_norm = torch.nn.LayerNorm(output_size, eps=1e-5)
+        self.static_chunk_size = static_chunk_size
+        self.use_dynamic_chunk = use_dynamic_chunk
+        self.use_dynamic_left_chunk = use_dynamic_left_chunk
+        self.gradient_checkpointing = gradient_checkpointing
+        activation = COSYVOICE_ACTIVATION_CLASSES[activation_type]()
+        # self-attention module definition
+        encoder_selfattn_layer_args = (
+            attention_heads,
+            output_size,
+            attention_dropout_rate,
+            key_bias,
+        )
+        # feed-forward module definition
+        positionwise_layer_args = (
+            output_size,
+            linear_units,
+            dropout_rate,
+            activation,
+        )
+        # convolution module definition
+        convolution_layer_args = (output_size, cnn_module_kernel, activation,
+                                  cnn_module_norm, causal)
+        self.pre_lookahead_layer = PreLookaheadLayer(channels=512, pre_lookahead_len=3)
+        self.encoders = torch.nn.ModuleList([
+            ConformerEncoderLayer(
+                output_size,
+                COSYVOICE_ATTENTION_CLASSES[selfattention_layer_type](
+                    *encoder_selfattn_layer_args),
+                PositionwiseFeedForward(*positionwise_layer_args),
+                PositionwiseFeedForward(
+                    *positionwise_layer_args) if macaron_style else None,
+                ConvolutionModule(
+                    *convolution_layer_args) if use_cnn_module else None,
+                dropout_rate,
+                normalize_before,
+            ) for _ in range(num_blocks)
+        ])
+        self.up_layer = Upsample1D(channels=512, out_channels=512, stride=2)
+        self.up_embed = COSYVOICE_SUBSAMPLE_CLASSES[input_layer](
+            input_size,
+            output_size,
+            dropout_rate,
+            COSYVOICE_EMB_CLASSES[pos_enc_layer_type](output_size,
+                                                      positional_dropout_rate),
+        )
+        self.up_encoders = torch.nn.ModuleList([
+            ConformerEncoderLayer(
+                output_size,
+                COSYVOICE_ATTENTION_CLASSES[selfattention_layer_type](
+                    *encoder_selfattn_layer_args),
+                PositionwiseFeedForward(*positionwise_layer_args),
+                PositionwiseFeedForward(
+                    *positionwise_layer_args) if macaron_style else None,
+                ConvolutionModule(
+                    *convolution_layer_args) if use_cnn_module else None,
+                dropout_rate,
+                normalize_before,
+            ) for _ in range(4)
+        ])
+
+    def output_size(self) -> int:
+        return self._output_size
+
+    def forward(
+        self,
+        xs: torch.Tensor,
+        xs_lens: torch.Tensor,
+        decoding_chunk_size: int = 0,
+        num_decoding_left_chunks: int = -1,
+    ) -> Tuple[torch.Tensor, torch.Tensor]:
+        """Embed positions in tensor.
+
+        Args:
+            xs: padded input tensor (B, T, D)
+            xs_lens: input length (B)
+            decoding_chunk_size: decoding chunk size for dynamic chunk
+                0: default for training, use random dynamic chunk.
+                <0: for decoding, use full chunk.
+                >0: for decoding, use fixed chunk size as set.
+            num_decoding_left_chunks: number of left chunks, this is for decoding,
+            the chunk size is decoding_chunk_size.
+                >=0: use num_decoding_left_chunks
+                <0: use all left chunks
+        Returns:
+            encoder output tensor xs, and subsampled masks
+            xs: padded output tensor (B, T' ~= T/subsample_rate, D)
+            masks: torch.Tensor batch padding mask after subsample
+                (B, 1, T' ~= T/subsample_rate)
+        NOTE(xcsong):
+            We pass the `__call__` method of the modules instead of `forward` to the
+            checkpointing API because `__call__` attaches all the hooks of the module.
+            https://discuss.pytorch.org/t/any-different-between-model-input-and-model-forward-input/3690/2
+        """
+        T = xs.size(1)
+        masks = ~make_pad_mask(xs_lens, T).unsqueeze(1)  # (B, 1, T)
+        if self.global_cmvn is not None:
+            xs = self.global_cmvn(xs)
+        xs, pos_emb, masks = self.embed(xs, masks)
+        mask_pad = masks  # (B, 1, T/subsample_rate)
+        chunk_masks = add_optional_chunk_mask(xs, masks,
+                                              self.use_dynamic_chunk,
+                                              self.use_dynamic_left_chunk,
+                                              decoding_chunk_size,
+                                              self.static_chunk_size,
+                                              num_decoding_left_chunks)
+        # lookahead + conformer encoder
+        xs = self.pre_lookahead_layer(xs)
+        xs = self.forward_layers(xs, chunk_masks, pos_emb, mask_pad)
+
+        # upsample + conformer encoder
+        xs = xs.transpose(1, 2).contiguous()
+        xs, xs_lens = self.up_layer(xs, xs_lens)
+        xs = xs.transpose(1, 2).contiguous()
+        T = xs.size(1)
+        masks = ~make_pad_mask(xs_lens, T).unsqueeze(1)  # (B, 1, T)
+        xs, pos_emb, masks = self.up_embed(xs, masks)
+        mask_pad = masks  # (B, 1, T/subsample_rate)
+        chunk_masks = add_optional_chunk_mask(xs, masks,
+                                              self.use_dynamic_chunk,
+                                              self.use_dynamic_left_chunk,
+                                              decoding_chunk_size,
+                                              self.static_chunk_size * self.up_layer.stride,
+                                              num_decoding_left_chunks)
+        xs = self.forward_up_layers(xs, chunk_masks, pos_emb, mask_pad)
+
+        if self.normalize_before:
+            xs = self.after_norm(xs)
+        # Here we assume the mask is not changed in encoder layers, so just
+        # return the masks before encoder layers, and the masks will be used
+        # for cross attention with decoder later
+        return xs, masks
+
+    def forward_layers(self, xs: torch.Tensor, chunk_masks: torch.Tensor,
+                       pos_emb: torch.Tensor,
+                       mask_pad: torch.Tensor) -> torch.Tensor:
+        for layer in self.encoders:
+            xs, chunk_masks, _, _ = layer(xs, chunk_masks, pos_emb, mask_pad)
+        return xs
+
+    def forward_up_layers(self, xs: torch.Tensor, chunk_masks: torch.Tensor,
+                       pos_emb: torch.Tensor,
+                       mask_pad: torch.Tensor) -> torch.Tensor:
+        for layer in self.up_encoders:
+            xs, chunk_masks, _, _ = layer(xs, chunk_masks, pos_emb, mask_pad)
+        return xs