| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- # Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
- #
- # Redistribution and use in source and binary forms, with or without
- # modification, are permitted provided that the following conditions
- # are met:
- # * Redistributions of source code must retain the above copyright
- # notice, this list of conditions and the following disclaimer.
- # * Redistributions in binary form must reproduce the above copyright
- # notice, this list of conditions and the following disclaimer in the
- # documentation and/or other materials provided with the distribution.
- # * Neither the name of NVIDIA CORPORATION nor the names of its
- # contributors may be used to endorse or promote products derived
- # from this software without specific prior written permission.
- #
- # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
- # EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
- # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
- # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
- # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
- # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
- # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
- # OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- import json
- import torch
- from torch.utils.dlpack import to_dlpack
- import triton_python_backend_utils as pb_utils
- import os
- import numpy as np
- import torchaudio.compliance.kaldi as kaldi
- from cosyvoice.utils.file_utils import convert_onnx_to_trt
- from cosyvoice.utils.common import TrtContextWrapper
- import onnxruntime
- class TritonPythonModel:
- """Triton Python model for audio tokenization.
- This model takes reference audio input and extracts semantic tokens
- using s3tokenizer.
- """
- def initialize(self, args):
- """Initialize the model.
- Args:
- args: Dictionary containing model configuration
- """
- # Parse model parameters
- parameters = json.loads(args['model_config'])['parameters']
- model_params = {k: v["string_value"] for k, v in parameters.items()}
- self.device = torch.device("cuda")
- model_dir = model_params["model_dir"]
- gpu = "l20"
- enable_trt = True
- if enable_trt:
- self.load_spk_trt(f'{model_dir}/campplus.{gpu}.fp32.trt',
- f'{model_dir}/campplus.onnx',
- 1,
- False)
- else:
- campplus_model = f'{model_dir}/campplus.onnx'
- option = onnxruntime.SessionOptions()
- option.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
- option.intra_op_num_threads = 1
- self.spk_model = onnxruntime.InferenceSession(campplus_model, sess_options=option, providers=["CPUExecutionProvider"])
- def load_spk_trt(self, spk_model, spk_onnx_model, trt_concurrent=1, fp16=True):
- if not os.path.exists(spk_model) or os.path.getsize(spk_model) == 0:
- trt_kwargs = self.get_spk_trt_kwargs()
- convert_onnx_to_trt(spk_model, trt_kwargs, spk_onnx_model, fp16)
- import tensorrt as trt
- with open(spk_model, 'rb') as f:
- spk_engine = trt.Runtime(trt.Logger(trt.Logger.INFO)).deserialize_cuda_engine(f.read())
- assert spk_engine is not None, 'failed to load trt {}'.format(spk_model)
- self.spk_model = TrtContextWrapper(spk_engine, trt_concurrent=trt_concurrent, device=self.device)
- def get_spk_trt_kwargs(self):
- min_shape = [(1, 4, 80)]
- opt_shape = [(1, 500, 80)]
- max_shape = [(1, 3000, 80)]
- input_names = ["input"]
- return {'min_shape': min_shape, 'opt_shape': opt_shape, 'max_shape': max_shape, 'input_names': input_names}
- def _extract_spk_embedding(self, speech):
- feat = kaldi.fbank(speech,
- num_mel_bins=80,
- dither=0,
- sample_frequency=16000)
- spk_feat = feat - feat.mean(dim=0, keepdim=True)
- if isinstance(self.spk_model, onnxruntime.InferenceSession):
- embedding = self.spk_model.run(
- None, {self.spk_model.get_inputs()[0].name: spk_feat.unsqueeze(dim=0).cpu().numpy()}
- )[0].flatten().tolist()
- embedding = torch.tensor([embedding]).to(self.device)
- else:
- [spk_model, stream], trt_engine = self.spk_model.acquire_estimator()
- # NOTE need to synchronize when switching stream
- with torch.cuda.device(self.device):
- torch.cuda.current_stream().synchronize()
- spk_feat = spk_feat.unsqueeze(dim=0).to(self.device)
- batch_size = spk_feat.size(0)
- with stream:
- spk_model.set_input_shape('input', (batch_size, spk_feat.size(1), 80))
- embedding = torch.empty((batch_size, 192), device=spk_feat.device)
- data_ptrs = [spk_feat.contiguous().data_ptr(),
- embedding.contiguous().data_ptr()]
- for i, j in enumerate(data_ptrs):
- spk_model.set_tensor_address(trt_engine.get_tensor_name(i), j)
- # run trt engine
- assert spk_model.execute_async_v3(torch.cuda.current_stream().cuda_stream) is True
- torch.cuda.current_stream().synchronize()
- self.spk_model.release_estimator(spk_model, stream)
- return embedding.half()
- def execute(self, requests):
- """Execute inference on the batched requests.
- Args:
- requests: List of inference requests
- Returns:
- List of inference responses containing tokenized outputs
- """
- responses = []
- # Process each request in batch
- for request in requests:
- # Extract input tensors
- wav_array = pb_utils.get_input_tensor_by_name(
- request, "reference_wav").as_numpy()
- wav_array = torch.from_numpy(wav_array).to(self.device)
- embedding = self._extract_spk_embedding(wav_array)
- prompt_spk_embedding_tensor = pb_utils.Tensor.from_dlpack(
- "prompt_spk_embedding", to_dlpack(embedding))
- inference_response = pb_utils.InferenceResponse(
- output_tensors=[prompt_spk_embedding_tensor])
- responses.append(inference_response)
- return responses
|