1
0

frontend.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. # Copyright (c) 2024 Alibaba Inc (authors: Xiang Lyu)
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from functools import partial
  15. import onnxruntime
  16. import torch
  17. import numpy as np
  18. import whisper
  19. from typing import Callable
  20. import torchaudio.compliance.kaldi as kaldi
  21. import torchaudio
  22. import os
  23. import re
  24. import inflect
  25. try:
  26. import ttsfrd
  27. use_ttsfrd = True
  28. except ImportError:
  29. print("failed to import ttsfrd, use WeTextProcessing instead")
  30. from tn.chinese.normalizer import Normalizer as ZhNormalizer
  31. from tn.english.normalizer import Normalizer as EnNormalizer
  32. use_ttsfrd = False
  33. from cosyvoice.utils.frontend_utils import contains_chinese, replace_blank, replace_corner_mark, remove_bracket, spell_out_number, split_paragraph
  34. class CosyVoiceFrontEnd:
  35. def __init__(self,
  36. get_tokenizer: Callable,
  37. feat_extractor: Callable,
  38. campplus_model: str,
  39. speech_tokenizer_model: str,
  40. spk2info: str = '',
  41. instruct: bool = False,
  42. allowed_special: str = 'all'):
  43. self.tokenizer = get_tokenizer()
  44. self.feat_extractor = feat_extractor
  45. self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  46. option = onnxruntime.SessionOptions()
  47. option.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
  48. option.intra_op_num_threads = 1
  49. self.campplus_session = onnxruntime.InferenceSession(campplus_model, sess_options=option, providers=["CPUExecutionProvider"])
  50. self.speech_tokenizer_session = onnxruntime.InferenceSession(speech_tokenizer_model, sess_options=option, providers=["CUDAExecutionProvider"if torch.cuda.is_available() else "CPUExecutionProvider"])
  51. if os.path.exists(spk2info):
  52. self.spk2info = torch.load(spk2info, map_location=self.device)
  53. self.instruct = instruct
  54. self.allowed_special = allowed_special
  55. self.inflect_parser = inflect.engine()
  56. self.use_ttsfrd = use_ttsfrd
  57. if self.use_ttsfrd:
  58. self.frd = ttsfrd.TtsFrontendEngine()
  59. ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
  60. assert self.frd.initialize('{}/../../pretrained_models/CosyVoice-ttsfrd/resource'.format(ROOT_DIR)) is True, 'failed to initialize ttsfrd resource'
  61. self.frd.set_lang_type('pinyin')
  62. self.frd.enable_pinyin_mix(True)
  63. self.frd.set_breakmodel_index(1)
  64. else:
  65. self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False)
  66. self.en_tn_model = EnNormalizer()
  67. def _extract_text_token(self, text):
  68. text_token = self.tokenizer.encode(text, allowed_special=self.allowed_special)
  69. text_token = torch.tensor([text_token], dtype=torch.int32).to(self.device)
  70. text_token_len = torch.tensor([text_token.shape[1]], dtype=torch.int32).to(self.device)
  71. return text_token, text_token_len
  72. def _extract_speech_token(self, speech):
  73. feat = whisper.log_mel_spectrogram(speech, n_mels=128)
  74. speech_token = self.speech_tokenizer_session.run(None, {self.speech_tokenizer_session.get_inputs()[0].name: feat.detach().cpu().numpy(),
  75. self.speech_tokenizer_session.get_inputs()[1].name: np.array([feat.shape[2]], dtype=np.int32)})[0].flatten().tolist()
  76. speech_token = torch.tensor([speech_token], dtype=torch.int32).to(self.device)
  77. speech_token_len = torch.tensor([speech_token.shape[1]], dtype=torch.int32).to(self.device)
  78. return speech_token, speech_token_len
  79. def _extract_spk_embedding(self, speech):
  80. feat = kaldi.fbank(speech,
  81. num_mel_bins=80,
  82. dither=0,
  83. sample_frequency=16000)
  84. feat = feat - feat.mean(dim=0, keepdim=True)
  85. embedding = self.campplus_session.run(None, {self.campplus_session.get_inputs()[0].name: feat.unsqueeze(dim=0).cpu().numpy()})[0].flatten().tolist()
  86. embedding = torch.tensor([embedding]).to(self.device)
  87. return embedding
  88. def _extract_speech_feat(self, speech):
  89. speech_feat = self.feat_extractor(speech).squeeze(dim=0).transpose(0, 1).to(self.device)
  90. speech_feat = speech_feat.unsqueeze(dim=0)
  91. speech_feat_len = torch.tensor([speech_feat.shape[1]], dtype=torch.int32).to(self.device)
  92. return speech_feat, speech_feat_len
  93. def text_normalize(self, text, split=True):
  94. text = text.strip()
  95. if contains_chinese(text):
  96. if self.use_ttsfrd:
  97. text = self.frd.get_frd_extra_info(text, 'input')
  98. else:
  99. text = self.zh_tn_model.normalize(text)
  100. text = text.replace("\n", "")
  101. text = replace_blank(text)
  102. text = replace_corner_mark(text)
  103. text = text.replace(".", "、")
  104. text = text.replace(" - ", ",")
  105. text = remove_bracket(text)
  106. text = re.sub(r'[,,]+$', '。', text)
  107. texts = [i for i in split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "zh", token_max_n=80,
  108. token_min_n=60, merge_len=20,
  109. comma_split=False)]
  110. else:
  111. if self.use_ttsfrd:
  112. text = self.frd.get_frd_extra_info(text, 'input')
  113. else:
  114. text = self.en_tn_model.normalize(text)
  115. text = spell_out_number(text, self.inflect_parser)
  116. texts = [i for i in split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "en", token_max_n=80,
  117. token_min_n=60, merge_len=20,
  118. comma_split=False)]
  119. if split is False:
  120. return text
  121. return texts
  122. def frontend_sft(self, tts_text, spk_id):
  123. tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
  124. embedding = self.spk2info[spk_id]['embedding']
  125. model_input = {'text': tts_text_token, 'text_len': tts_text_token_len, 'llm_embedding': embedding, 'flow_embedding': embedding}
  126. return model_input
  127. def frontend_zero_shot(self, tts_text, prompt_text, prompt_speech_16k):
  128. tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
  129. prompt_text_token, prompt_text_token_len = self._extract_text_token(prompt_text)
  130. prompt_speech_22050 = torchaudio.transforms.Resample(orig_freq=16000, new_freq=22050)(prompt_speech_16k)
  131. speech_feat, speech_feat_len = self._extract_speech_feat(prompt_speech_22050)
  132. speech_token, speech_token_len = self._extract_speech_token(prompt_speech_16k)
  133. embedding = self._extract_spk_embedding(prompt_speech_16k)
  134. model_input = {'text': tts_text_token, 'text_len': tts_text_token_len,
  135. 'prompt_text': prompt_text_token, 'prompt_text_len': prompt_text_token_len,
  136. 'llm_prompt_speech_token': speech_token, 'llm_prompt_speech_token_len': speech_token_len,
  137. 'flow_prompt_speech_token': speech_token, 'flow_prompt_speech_token_len': speech_token_len,
  138. 'prompt_speech_feat': speech_feat, 'prompt_speech_feat_len': speech_feat_len,
  139. 'llm_embedding': embedding, 'flow_embedding': embedding}
  140. return model_input
  141. def frontend_cross_lingual(self, tts_text, prompt_speech_16k):
  142. model_input = self.frontend_zero_shot(tts_text, '', prompt_speech_16k)
  143. # in cross lingual mode, we remove prompt in llm
  144. del model_input['prompt_text']
  145. del model_input['prompt_text_len']
  146. del model_input['llm_prompt_speech_token']
  147. del model_input['llm_prompt_speech_token_len']
  148. return model_input
  149. def frontend_instruct(self, tts_text, spk_id, instruct_text):
  150. model_input = self.frontend_sft(tts_text, spk_id)
  151. # in instruct mode, we remove spk_embedding in llm due to information leakage
  152. del model_input['llm_embedding']
  153. instruct_text_token, instruct_text_token_len = self._extract_text_token(instruct_text + '<endofprompt>')
  154. model_input['prompt_text'] = instruct_text_token
  155. model_input['prompt_text_len'] = instruct_text_token_len
  156. return model_input