frontend.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # Copyright (c) 2024 Alibaba Inc (authors: Xiang Lyu)
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from functools import partial
  15. from typing import Generator
  16. import json
  17. import onnxruntime
  18. import torch
  19. import numpy as np
  20. import whisper
  21. from typing import Callable
  22. import torchaudio.compliance.kaldi as kaldi
  23. import os
  24. import re
  25. import inflect
  26. from cosyvoice.utils.file_utils import logging, load_wav
  27. from cosyvoice.utils.frontend_utils import contains_chinese, replace_blank, replace_corner_mark, remove_bracket, spell_out_number, split_paragraph, is_only_punctuation
  28. class CosyVoiceFrontEnd:
  29. def __init__(self,
  30. get_tokenizer: Callable,
  31. feat_extractor: Callable,
  32. campplus_model: str,
  33. speech_tokenizer_model: str,
  34. spk2info: str = '',
  35. allowed_special: str = 'all'):
  36. self.tokenizer = get_tokenizer()
  37. self.feat_extractor = feat_extractor
  38. self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  39. option = onnxruntime.SessionOptions()
  40. option.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
  41. option.intra_op_num_threads = 1
  42. self.campplus_session = onnxruntime.InferenceSession(campplus_model, sess_options=option, providers=["CPUExecutionProvider"])
  43. self.speech_tokenizer_session = onnxruntime.InferenceSession(speech_tokenizer_model, sess_options=option,
  44. providers=["CUDAExecutionProvider" if torch.cuda.is_available() else
  45. "CPUExecutionProvider"])
  46. if os.path.exists(spk2info):
  47. self.spk2info = torch.load(spk2info, map_location=self.device)
  48. else:
  49. self.spk2info = {}
  50. self.allowed_special = allowed_special
  51. self.inflect_parser = inflect.engine()
  52. # NOTE compatible when no text frontend tool is avaliable
  53. try:
  54. import ttsfrd
  55. self.frd = ttsfrd.TtsFrontendEngine()
  56. ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
  57. assert self.frd.initialize('{}/../../pretrained_models/CosyVoice-ttsfrd/resource'.format(ROOT_DIR)) is True, \
  58. 'failed to initialize ttsfrd resource'
  59. self.frd.set_lang_type('pinyinvg')
  60. self.text_frontend = 'ttsfrd'
  61. logging.info('use ttsfrd frontend')
  62. except:
  63. try:
  64. from wetext import Normalizer as ZhNormalizer
  65. from wetext import Normalizer as EnNormalizer
  66. self.zh_tn_model = ZhNormalizer(remove_erhua=False)
  67. self.en_tn_model = EnNormalizer()
  68. self.text_frontend = 'wetext'
  69. logging.info('use wetext frontend')
  70. except:
  71. self.text_frontend = ''
  72. logging.info('no frontend is avaliable')
  73. def _extract_text_token(self, text):
  74. if isinstance(text, Generator):
  75. logging.info('get tts_text generator, will return _extract_text_token_generator!')
  76. # NOTE add a dummy text_token_len for compatibility
  77. return self._extract_text_token_generator(text), torch.tensor([0], dtype=torch.int32).to(self.device)
  78. else:
  79. text_token = self.tokenizer.encode(text, allowed_special=self.allowed_special)
  80. text_token = torch.tensor([text_token], dtype=torch.int32).to(self.device)
  81. text_token_len = torch.tensor([text_token.shape[1]], dtype=torch.int32).to(self.device)
  82. return text_token, text_token_len
  83. def _extract_text_token_generator(self, text_generator):
  84. for text in text_generator:
  85. text_token, _ = self._extract_text_token(text)
  86. for i in range(text_token.shape[1]):
  87. yield text_token[:, i: i + 1]
  88. def _extract_speech_token(self, prompt_wav):
  89. speech = load_wav(prompt_wav, 16000)
  90. assert speech.shape[1] / 16000 <= 30, 'do not support extract speech token for audio longer than 30s'
  91. feat = whisper.log_mel_spectrogram(speech, n_mels=128)
  92. speech_token = self.speech_tokenizer_session.run(None,
  93. {self.speech_tokenizer_session.get_inputs()[0].name:
  94. feat.detach().cpu().numpy(),
  95. self.speech_tokenizer_session.get_inputs()[1].name:
  96. np.array([feat.shape[2]], dtype=np.int32)})[0].flatten().tolist()
  97. speech_token = torch.tensor([speech_token], dtype=torch.int32).to(self.device)
  98. speech_token_len = torch.tensor([speech_token.shape[1]], dtype=torch.int32).to(self.device)
  99. return speech_token, speech_token_len
  100. def _extract_spk_embedding(self, prompt_wav):
  101. speech = load_wav(prompt_wav, 16000)
  102. feat = kaldi.fbank(speech,
  103. num_mel_bins=80,
  104. dither=0,
  105. sample_frequency=16000)
  106. feat = feat - feat.mean(dim=0, keepdim=True)
  107. embedding = self.campplus_session.run(None,
  108. {self.campplus_session.get_inputs()[0].name: feat.unsqueeze(dim=0).cpu().numpy()})[0].flatten().tolist()
  109. embedding = torch.tensor([embedding]).to(self.device)
  110. return embedding
  111. def _extract_speech_feat(self, prompt_wav):
  112. speech = load_wav(prompt_wav, 24000)
  113. speech_feat = self.feat_extractor(speech).squeeze(dim=0).transpose(0, 1).to(self.device)
  114. speech_feat = speech_feat.unsqueeze(dim=0)
  115. speech_feat_len = torch.tensor([speech_feat.shape[1]], dtype=torch.int32).to(self.device)
  116. return speech_feat, speech_feat_len
  117. def text_normalize(self, text, split=True, text_frontend=True):
  118. if isinstance(text, Generator):
  119. logging.info('get tts_text generator, will skip text_normalize!')
  120. return [text]
  121. # NOTE skip text_frontend when ssml symbol in text
  122. if '<|' in text and '|>' in text:
  123. text_frontend = False
  124. if text_frontend is False or text == '':
  125. return [text] if split is True else text
  126. text = text.strip()
  127. if self.text_frontend == 'ttsfrd':
  128. texts = [i["text"] for i in json.loads(self.frd.do_voicegen_frd(text))["sentences"]]
  129. text = ''.join(texts)
  130. else:
  131. if contains_chinese(text):
  132. if self.text_frontend == 'wetext':
  133. text = self.zh_tn_model.normalize(text)
  134. text = text.replace("\n", "")
  135. text = replace_blank(text)
  136. text = replace_corner_mark(text)
  137. text = text.replace(".", "。")
  138. text = text.replace(" - ", ",")
  139. text = remove_bracket(text)
  140. text = re.sub(r'[,,、]+$', '。', text)
  141. texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "zh", token_max_n=80,
  142. token_min_n=60, merge_len=20, comma_split=False))
  143. else:
  144. if self.text_frontend == 'wetext':
  145. text = self.en_tn_model.normalize(text)
  146. text = spell_out_number(text, self.inflect_parser)
  147. texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "en", token_max_n=80,
  148. token_min_n=60, merge_len=20, comma_split=False))
  149. texts = [i for i in texts if not is_only_punctuation(i)]
  150. return texts if split is True else text
  151. def frontend_sft(self, tts_text, spk_id):
  152. tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
  153. embedding = self.spk2info[spk_id]['embedding']
  154. model_input = {'text': tts_text_token, 'text_len': tts_text_token_len, 'llm_embedding': embedding, 'flow_embedding': embedding}
  155. return model_input
  156. def frontend_zero_shot(self, tts_text, prompt_text, prompt_wav, resample_rate, zero_shot_spk_id):
  157. tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
  158. if zero_shot_spk_id == '':
  159. prompt_text_token, prompt_text_token_len = self._extract_text_token(prompt_text)
  160. speech_feat, speech_feat_len = self._extract_speech_feat(prompt_wav)
  161. speech_token, speech_token_len = self._extract_speech_token(prompt_wav)
  162. if resample_rate == 24000:
  163. # cosyvoice2, force speech_feat % speech_token = 2
  164. token_len = min(int(speech_feat.shape[1] / 2), speech_token.shape[1])
  165. speech_feat, speech_feat_len[:] = speech_feat[:, :2 * token_len], 2 * token_len
  166. speech_token, speech_token_len[:] = speech_token[:, :token_len], token_len
  167. embedding = self._extract_spk_embedding(prompt_wav)
  168. model_input = {'prompt_text': prompt_text_token, 'prompt_text_len': prompt_text_token_len,
  169. 'llm_prompt_speech_token': speech_token, 'llm_prompt_speech_token_len': speech_token_len,
  170. 'flow_prompt_speech_token': speech_token, 'flow_prompt_speech_token_len': speech_token_len,
  171. 'prompt_speech_feat': speech_feat, 'prompt_speech_feat_len': speech_feat_len,
  172. 'llm_embedding': embedding, 'flow_embedding': embedding}
  173. else:
  174. model_input = {**self.spk2info[zero_shot_spk_id]}
  175. model_input['text'] = tts_text_token
  176. model_input['text_len'] = tts_text_token_len
  177. return model_input
  178. def frontend_cross_lingual(self, tts_text, prompt_wav, resample_rate, zero_shot_spk_id):
  179. model_input = self.frontend_zero_shot(tts_text, '', prompt_wav, resample_rate, zero_shot_spk_id)
  180. # in cross lingual mode, we remove prompt in llm
  181. del model_input['prompt_text']
  182. del model_input['prompt_text_len']
  183. del model_input['llm_prompt_speech_token']
  184. del model_input['llm_prompt_speech_token_len']
  185. return model_input
  186. def frontend_instruct(self, tts_text, spk_id, instruct_text):
  187. model_input = self.frontend_sft(tts_text, spk_id)
  188. # in instruct mode, we remove spk_embedding in llm due to information leakage
  189. del model_input['llm_embedding']
  190. instruct_text_token, instruct_text_token_len = self._extract_text_token(instruct_text)
  191. model_input['prompt_text'] = instruct_text_token
  192. model_input['prompt_text_len'] = instruct_text_token_len
  193. return model_input
  194. def frontend_instruct2(self, tts_text, instruct_text, prompt_wav, resample_rate, zero_shot_spk_id):
  195. model_input = self.frontend_zero_shot(tts_text, instruct_text, prompt_wav, resample_rate, zero_shot_spk_id)
  196. del model_input['llm_prompt_speech_token']
  197. del model_input['llm_prompt_speech_token_len']
  198. return model_input
  199. def frontend_vc(self, source_speech_16k, prompt_wav, resample_rate):
  200. prompt_speech_token, prompt_speech_token_len = self._extract_speech_token(prompt_wav)
  201. prompt_speech_feat, prompt_speech_feat_len = self._extract_speech_feat(prompt_wav)
  202. embedding = self._extract_spk_embedding(prompt_wav)
  203. source_speech_token, source_speech_token_len = self._extract_speech_token(source_speech_16k)
  204. model_input = {'source_speech_token': source_speech_token, 'source_speech_token_len': source_speech_token_len,
  205. 'flow_prompt_speech_token': prompt_speech_token, 'flow_prompt_speech_token_len': prompt_speech_token_len,
  206. 'prompt_speech_feat': prompt_speech_feat, 'prompt_speech_feat_len': prompt_speech_feat_len,
  207. 'flow_embedding': embedding}
  208. return model_input