frontend.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # Copyright (c) 2024 Alibaba Inc (authors: Xiang Lyu)
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from functools import partial
  15. from typing import Generator, Optional
  16. import json
  17. import onnxruntime
  18. import torch
  19. import numpy as np
  20. import whisper
  21. from typing import Callable
  22. import torchaudio.compliance.kaldi as kaldi
  23. import torchaudio
  24. import os
  25. import re
  26. import inflect
  27. from pydantic import BaseModel, ConfigDict
  28. try:
  29. import ttsfrd
  30. use_ttsfrd = True
  31. except ImportError:
  32. print("failed to import ttsfrd, use WeTextProcessing instead")
  33. from tn.chinese.normalizer import Normalizer as ZhNormalizer
  34. from tn.english.normalizer import Normalizer as EnNormalizer
  35. use_ttsfrd = False
  36. from cosyvoice.utils.file_utils import logging
  37. from cosyvoice.utils.frontend_utils import contains_chinese, replace_blank, replace_corner_mark, remove_bracket, spell_out_number, split_paragraph, is_only_punctuation
  38. class SpeakerInfo(BaseModel):
  39. model_config = ConfigDict(arbitrary_types_allowed=True)
  40. name: Optional[str] = None
  41. spk_id: str
  42. prompt_text: str
  43. prompt_text_token: torch.Tensor
  44. speech_feat: torch.Tensor
  45. speech_token: torch.Tensor
  46. embedding: torch.Tensor
  47. class CosyVoiceFrontEnd:
  48. def __init__(self,
  49. get_tokenizer: Callable,
  50. feat_extractor: Callable,
  51. campplus_model: str,
  52. speech_tokenizer_model: str,
  53. spk2info: str = '',
  54. allowed_special: str = 'all'):
  55. self.tokenizer = get_tokenizer()
  56. self.feat_extractor = feat_extractor
  57. self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
  58. option = onnxruntime.SessionOptions()
  59. option.graph_optimization_level = onnxruntime.GraphOptimizationLevel.ORT_ENABLE_ALL
  60. option.intra_op_num_threads = 1
  61. self.campplus_session = onnxruntime.InferenceSession(campplus_model, sess_options=option, providers=["CPUExecutionProvider"])
  62. self.speech_tokenizer_session = onnxruntime.InferenceSession(speech_tokenizer_model, sess_options=option,
  63. providers=["CUDAExecutionProvider" if torch.cuda.is_available() else
  64. "CPUExecutionProvider"])
  65. self.spk2info_path = spk2info
  66. if os.path.exists(spk2info):
  67. self.spk2info = torch.load(spk2info, map_location=self.device, weights_only=False)
  68. else:
  69. self.spk2info = {}
  70. self.allowed_special = allowed_special
  71. self.use_ttsfrd = use_ttsfrd
  72. if self.use_ttsfrd:
  73. self.frd = ttsfrd.TtsFrontendEngine()
  74. ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
  75. assert self.frd.initialize('{}/../../pretrained_models/CosyVoice-ttsfrd/resource'.format(ROOT_DIR)) is True, \
  76. 'failed to initialize ttsfrd resource'
  77. self.frd.set_lang_type('pinyinvg')
  78. else:
  79. # self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, overwrite_cache=True)
  80. self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, overwrite_cache=False)
  81. self.en_tn_model = EnNormalizer()
  82. self.inflect_parser = inflect.engine()
  83. def _extract_text_token(self, text):
  84. if isinstance(text, Generator):
  85. logging.info('get tts_text generator, will return _extract_text_token_generator!')
  86. # NOTE add a dummy text_token_len for compatibility
  87. return self._extract_text_token_generator(text), torch.tensor([0], dtype=torch.int32).to(self.device)
  88. else:
  89. text_token = self.tokenizer.encode(text, allowed_special=self.allowed_special)
  90. text_token = torch.tensor([text_token], dtype=torch.int32).to(self.device)
  91. text_token_len = torch.tensor([text_token.shape[1]], dtype=torch.int32).to(self.device)
  92. return text_token, text_token_len
  93. def _extract_text_token_generator(self, text_generator):
  94. for text in text_generator:
  95. text_token, _ = self._extract_text_token(text)
  96. # for i in range(text_token.shape[1]):
  97. # yield text_token[:, i: i + 1]
  98. yield text_token
  99. def _extract_speech_token(self, speech):
  100. assert speech.shape[1] / 16000 <= 30, 'do not support extract speech token for audio longer than 30s'
  101. feat = whisper.log_mel_spectrogram(speech, n_mels=128)
  102. speech_token = self.speech_tokenizer_session.run(None,
  103. {self.speech_tokenizer_session.get_inputs()[0].name:
  104. feat.detach().cpu().numpy(),
  105. self.speech_tokenizer_session.get_inputs()[1].name:
  106. np.array([feat.shape[2]], dtype=np.int32)})[0].flatten().tolist()
  107. speech_token = torch.tensor([speech_token], dtype=torch.int32).to(self.device)
  108. speech_token_len = torch.tensor([speech_token.shape[1]], dtype=torch.int32).to(self.device)
  109. return speech_token, speech_token_len
  110. def _extract_spk_embedding(self, speech):
  111. feat = kaldi.fbank(speech,
  112. num_mel_bins=80,
  113. dither=0,
  114. sample_frequency=16000)
  115. feat = feat - feat.mean(dim=0, keepdim=True)
  116. embedding = self.campplus_session.run(None,
  117. {self.campplus_session.get_inputs()[0].name: feat.unsqueeze(dim=0).cpu().numpy()})[0].flatten().tolist()
  118. embedding = torch.tensor([embedding]).to(self.device)
  119. return embedding
  120. def _extract_speech_feat(self, speech):
  121. speech_feat = self.feat_extractor(speech).squeeze(dim=0).transpose(0, 1).to(self.device)
  122. speech_feat = speech_feat.unsqueeze(dim=0)
  123. speech_feat_len = torch.tensor([speech_feat.shape[1]], dtype=torch.int32).to(self.device)
  124. return speech_feat, speech_feat_len
  125. def text_normalize(self, text, split=True, text_frontend=True):
  126. if isinstance(text, Generator):
  127. logging.info('get tts_text generator, will skip text_normalize!')
  128. return [text]
  129. if text_frontend is False:
  130. return [text] if split is True else text
  131. text = text.strip()
  132. if self.use_ttsfrd:
  133. texts = [i["text"] for i in json.loads(self.frd.do_voicegen_frd(text))["sentences"]]
  134. text = ''.join(texts)
  135. else:
  136. if contains_chinese(text):
  137. text = self.zh_tn_model.normalize(text)
  138. text = text.replace("\n", "")
  139. text = replace_blank(text)
  140. text = replace_corner_mark(text)
  141. text = text.replace(".", "。")
  142. text = text.replace(" - ", ",")
  143. text = remove_bracket(text)
  144. text = re.sub(r'[,,、]+$', '。', text)
  145. if not split:
  146. return text
  147. texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "zh", token_max_n=80,
  148. token_min_n=60, merge_len=20, comma_split=False))
  149. else:
  150. text = self.en_tn_model.normalize(text)
  151. text = spell_out_number(text, self.inflect_parser)
  152. if not split:
  153. return text
  154. texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "en", token_max_n=80,
  155. token_min_n=60, merge_len=20, comma_split=False))
  156. texts = [i for i in texts if not is_only_punctuation(i)]
  157. return texts if split is True else text
  158. def frontend_sft(self, tts_text, spk_id):
  159. tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
  160. embedding = self.spk2info[spk_id]['embedding']
  161. assert embedding is not None
  162. model_input = {'text': tts_text_token, 'text_len': tts_text_token_len, 'llm_embedding': embedding, 'flow_embedding': embedding}
  163. return model_input
  164. def frontend_zero_shot(self, tts_text, prompt_text, prompt_speech_16k, resample_rate):
  165. tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
  166. prompt_text_token, prompt_text_token_len = self._extract_text_token(prompt_text)
  167. prompt_speech_resample = torchaudio.transforms.Resample(orig_freq=16000, new_freq=resample_rate)(prompt_speech_16k)
  168. speech_feat, speech_feat_len = self._extract_speech_feat(prompt_speech_resample)
  169. speech_token, speech_token_len = self._extract_speech_token(prompt_speech_16k)
  170. if resample_rate == 24000:
  171. # cosyvoice2, force speech_feat % speech_token = 2
  172. token_len = min(int(speech_feat.shape[1] / 2), speech_token.shape[1])
  173. speech_feat, speech_feat_len[:] = speech_feat[:, :2 * token_len], 2 * token_len
  174. speech_token, speech_token_len[:] = speech_token[:, :token_len], token_len
  175. embedding = self._extract_spk_embedding(prompt_speech_16k)
  176. model_input = {'text': tts_text_token, 'text_len': tts_text_token_len,
  177. 'prompt_text': prompt_text_token, 'prompt_text_len': prompt_text_token_len,
  178. 'llm_prompt_speech_token': speech_token, 'llm_prompt_speech_token_len': speech_token_len,
  179. 'flow_prompt_speech_token': speech_token, 'flow_prompt_speech_token_len': speech_token_len,
  180. 'prompt_speech_feat': speech_feat, 'prompt_speech_feat_len': speech_feat_len,
  181. 'llm_embedding': embedding, 'flow_embedding': embedding}
  182. return model_input
  183. def frontend_cross_lingual(self, tts_text, prompt_speech_16k, resample_rate):
  184. model_input = self.frontend_zero_shot(tts_text, '', prompt_speech_16k, resample_rate)
  185. # in cross lingual mode, we remove prompt in llm
  186. del model_input['prompt_text']
  187. del model_input['prompt_text_len']
  188. del model_input['llm_prompt_speech_token']
  189. del model_input['llm_prompt_speech_token_len']
  190. return model_input
  191. def frontend_instruct(self, tts_text, spk_id, instruct_text):
  192. model_input = self.frontend_sft(tts_text, spk_id)
  193. # in instruct mode, we remove spk_embedding in llm due to information leakage
  194. del model_input['llm_embedding']
  195. instruct_text_token, instruct_text_token_len = self._extract_text_token(instruct_text + '<endofprompt>')
  196. model_input['prompt_text'] = instruct_text_token
  197. model_input['prompt_text_len'] = instruct_text_token_len
  198. return model_input
  199. def frontend_instruct2(self, tts_text, instruct_text, prompt_speech_16k, resample_rate):
  200. model_input = self.frontend_zero_shot(tts_text, instruct_text + '<|endofprompt|>', prompt_speech_16k, resample_rate)
  201. del model_input['llm_prompt_speech_token']
  202. del model_input['llm_prompt_speech_token_len']
  203. return model_input
  204. def frontend_vc(self, source_speech_16k, prompt_speech_16k, resample_rate):
  205. prompt_speech_token, prompt_speech_token_len = self._extract_speech_token(prompt_speech_16k)
  206. prompt_speech_resample = torchaudio.transforms.Resample(orig_freq=16000, new_freq=resample_rate)(prompt_speech_16k)
  207. prompt_speech_feat, prompt_speech_feat_len = self._extract_speech_feat(prompt_speech_resample)
  208. embedding = self._extract_spk_embedding(prompt_speech_16k)
  209. source_speech_token, source_speech_token_len = self._extract_speech_token(source_speech_16k)
  210. model_input = {'source_speech_token': source_speech_token, 'source_speech_token_len': source_speech_token_len,
  211. 'flow_prompt_speech_token': prompt_speech_token, 'flow_prompt_speech_token_len': prompt_speech_token_len,
  212. 'prompt_speech_feat': prompt_speech_feat, 'prompt_speech_feat_len': prompt_speech_feat_len,
  213. 'flow_embedding': embedding}
  214. return model_input
  215. def generate_spk_info(self, spk_id: str, prompt_text: str, prompt_speech_16k: torch.Tensor, resample_rate:int=24000, name: str=None):
  216. assert isinstance(spk_id, str)
  217. assert spk_id not in self.spk2info, "spk_id already exists"
  218. prompt_text_token, _ = self._extract_text_token(prompt_text)
  219. prompt_speech_resample = torchaudio.transforms.Resample(orig_freq=16000, new_freq=resample_rate)(prompt_speech_16k)
  220. speech_feat, _ = self._extract_speech_feat(prompt_speech_resample)
  221. speech_token, speech_token_len = self._extract_speech_token(prompt_speech_16k)
  222. if resample_rate == 24000:
  223. # cosyvoice2, force speech_feat % speech_token = 2
  224. token_len = min(int(speech_feat.shape[1] / 2), speech_token.shape[1])
  225. speech_feat = speech_feat[:, :2 * token_len]
  226. speech_token = speech_token[:, :token_len]
  227. embedding = self._extract_spk_embedding(prompt_speech_16k)
  228. spk_info = SpeakerInfo(
  229. name=name,
  230. spk_id=spk_id,
  231. prompt_text=prompt_text,
  232. prompt_text_token=prompt_text_token,
  233. speech_feat=speech_feat,
  234. speech_token=speech_token,
  235. embedding=embedding,
  236. )
  237. self.add_spk_info(spk_id, spk_info)
  238. def add_spk_info(self, spk_id: str, spk_info: dict|SpeakerInfo):
  239. if isinstance(spk_info, BaseModel):
  240. spk_info = spk_info.model_dump()
  241. self.spk2info[spk_id] = spk_info
  242. if self.spk2info_path:
  243. torch.save(self.spk2info, self.spk2info_path)
  244. def frontend_instruct2_by_spk_id(self, tts_text, instruct_text, spk_id):
  245. assert spk_id in self.spk2info
  246. tts_text_token, _ = self._extract_text_token(tts_text)
  247. prompt_text_token, _ = self._extract_text_token(instruct_text + '<|endofprompt|>')
  248. model_input = {'text': tts_text_token,
  249. 'prompt_text': prompt_text_token,
  250. 'flow_prompt_speech_token': self.spk2info[spk_id]['speech_token'],
  251. 'prompt_speech_feat': self.spk2info[spk_id]['speech_feat'],
  252. 'llm_embedding': self.spk2info[spk_id]['embedding'],
  253. 'flow_embedding': self.spk2info[spk_id]['embedding'],
  254. }
  255. return model_input
  256. def frontend_zero_shot_by_spk_id(self, tts_text, spk_id):
  257. assert spk_id in self.spk2info
  258. tts_text_token, _ = self._extract_text_token(tts_text)
  259. model_input = {'text': tts_text_token,
  260. 'prompt_text': self.spk2info[spk_id]['prompt_text_token'],
  261. 'llm_prompt_speech_token': self.spk2info[spk_id]['speech_token'],
  262. 'flow_prompt_speech_token': self.spk2info[spk_id]['speech_token'],
  263. 'prompt_speech_feat': self.spk2info[spk_id]['speech_feat'],
  264. 'llm_embedding': self.spk2info[spk_id]['embedding'],
  265. 'flow_embedding': self.spk2info[spk_id]['embedding']
  266. }
  267. return model_input