|
@@ -12,7 +12,7 @@
|
|
|
# See the License for the specific language governing permissions and
|
|
# See the License for the specific language governing permissions and
|
|
|
# limitations under the License.
|
|
# limitations under the License.
|
|
|
from functools import partial
|
|
from functools import partial
|
|
|
-from typing import Generator
|
|
|
|
|
|
|
+from typing import Generator, Optional
|
|
|
import json
|
|
import json
|
|
|
import onnxruntime
|
|
import onnxruntime
|
|
|
import torch
|
|
import torch
|
|
@@ -24,6 +24,8 @@ import torchaudio
|
|
|
import os
|
|
import os
|
|
|
import re
|
|
import re
|
|
|
import inflect
|
|
import inflect
|
|
|
|
|
+from pydantic import BaseModel, ConfigDict
|
|
|
|
|
+
|
|
|
try:
|
|
try:
|
|
|
import ttsfrd
|
|
import ttsfrd
|
|
|
use_ttsfrd = True
|
|
use_ttsfrd = True
|
|
@@ -36,6 +38,18 @@ from cosyvoice.utils.file_utils import logging
|
|
|
from cosyvoice.utils.frontend_utils import contains_chinese, replace_blank, replace_corner_mark, remove_bracket, spell_out_number, split_paragraph, is_only_punctuation
|
|
from cosyvoice.utils.frontend_utils import contains_chinese, replace_blank, replace_corner_mark, remove_bracket, spell_out_number, split_paragraph, is_only_punctuation
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+class SpeakerInfo(BaseModel):
|
|
|
|
|
+ model_config = ConfigDict(arbitrary_types_allowed=True)
|
|
|
|
|
+
|
|
|
|
|
+ name: Optional[str] = None
|
|
|
|
|
+ spk_id: str
|
|
|
|
|
+ prompt_text: str
|
|
|
|
|
+ prompt_text_token: torch.Tensor
|
|
|
|
|
+ speech_feat: torch.Tensor
|
|
|
|
|
+ speech_token: torch.Tensor
|
|
|
|
|
+ embedding: torch.Tensor
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
class CosyVoiceFrontEnd:
|
|
class CosyVoiceFrontEnd:
|
|
|
|
|
|
|
|
def __init__(self,
|
|
def __init__(self,
|
|
@@ -55,8 +69,9 @@ class CosyVoiceFrontEnd:
|
|
|
self.speech_tokenizer_session = onnxruntime.InferenceSession(speech_tokenizer_model, sess_options=option,
|
|
self.speech_tokenizer_session = onnxruntime.InferenceSession(speech_tokenizer_model, sess_options=option,
|
|
|
providers=["CUDAExecutionProvider" if torch.cuda.is_available() else
|
|
providers=["CUDAExecutionProvider" if torch.cuda.is_available() else
|
|
|
"CPUExecutionProvider"])
|
|
"CPUExecutionProvider"])
|
|
|
|
|
+ self.spk2info_path = spk2info
|
|
|
if os.path.exists(spk2info):
|
|
if os.path.exists(spk2info):
|
|
|
- self.spk2info = torch.load(spk2info, map_location=self.device)
|
|
|
|
|
|
|
+ self.spk2info = torch.load(spk2info, map_location=self.device, weights_only=False)
|
|
|
else:
|
|
else:
|
|
|
self.spk2info = {}
|
|
self.spk2info = {}
|
|
|
self.allowed_special = allowed_special
|
|
self.allowed_special = allowed_special
|
|
@@ -68,7 +83,8 @@ class CosyVoiceFrontEnd:
|
|
|
'failed to initialize ttsfrd resource'
|
|
'failed to initialize ttsfrd resource'
|
|
|
self.frd.set_lang_type('pinyinvg')
|
|
self.frd.set_lang_type('pinyinvg')
|
|
|
else:
|
|
else:
|
|
|
- self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, overwrite_cache=True)
|
|
|
|
|
|
|
+ # self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, overwrite_cache=True)
|
|
|
|
|
+ self.zh_tn_model = ZhNormalizer(remove_erhua=False, full_to_half=False, overwrite_cache=False)
|
|
|
self.en_tn_model = EnNormalizer()
|
|
self.en_tn_model = EnNormalizer()
|
|
|
self.inflect_parser = inflect.engine()
|
|
self.inflect_parser = inflect.engine()
|
|
|
|
|
|
|
@@ -86,8 +102,9 @@ class CosyVoiceFrontEnd:
|
|
|
def _extract_text_token_generator(self, text_generator):
|
|
def _extract_text_token_generator(self, text_generator):
|
|
|
for text in text_generator:
|
|
for text in text_generator:
|
|
|
text_token, _ = self._extract_text_token(text)
|
|
text_token, _ = self._extract_text_token(text)
|
|
|
- for i in range(text_token.shape[1]):
|
|
|
|
|
- yield text_token[:, i: i + 1]
|
|
|
|
|
|
|
+ # for i in range(text_token.shape[1]):
|
|
|
|
|
+ # yield text_token[:, i: i + 1]
|
|
|
|
|
+ yield text_token
|
|
|
|
|
|
|
|
def _extract_speech_token(self, speech):
|
|
def _extract_speech_token(self, speech):
|
|
|
assert speech.shape[1] / 16000 <= 30, 'do not support extract speech token for audio longer than 30s'
|
|
assert speech.shape[1] / 16000 <= 30, 'do not support extract speech token for audio longer than 30s'
|
|
@@ -138,11 +155,15 @@ class CosyVoiceFrontEnd:
|
|
|
text = text.replace(" - ", ",")
|
|
text = text.replace(" - ", ",")
|
|
|
text = remove_bracket(text)
|
|
text = remove_bracket(text)
|
|
|
text = re.sub(r'[,,、]+$', '。', text)
|
|
text = re.sub(r'[,,、]+$', '。', text)
|
|
|
|
|
+ if not split:
|
|
|
|
|
+ return text
|
|
|
texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "zh", token_max_n=80,
|
|
texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "zh", token_max_n=80,
|
|
|
token_min_n=60, merge_len=20, comma_split=False))
|
|
token_min_n=60, merge_len=20, comma_split=False))
|
|
|
else:
|
|
else:
|
|
|
text = self.en_tn_model.normalize(text)
|
|
text = self.en_tn_model.normalize(text)
|
|
|
text = spell_out_number(text, self.inflect_parser)
|
|
text = spell_out_number(text, self.inflect_parser)
|
|
|
|
|
+ if not split:
|
|
|
|
|
+ return text
|
|
|
texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "en", token_max_n=80,
|
|
texts = list(split_paragraph(text, partial(self.tokenizer.encode, allowed_special=self.allowed_special), "en", token_max_n=80,
|
|
|
token_min_n=60, merge_len=20, comma_split=False))
|
|
token_min_n=60, merge_len=20, comma_split=False))
|
|
|
texts = [i for i in texts if not is_only_punctuation(i)]
|
|
texts = [i for i in texts if not is_only_punctuation(i)]
|
|
@@ -151,6 +172,7 @@ class CosyVoiceFrontEnd:
|
|
|
def frontend_sft(self, tts_text, spk_id):
|
|
def frontend_sft(self, tts_text, spk_id):
|
|
|
tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
|
|
tts_text_token, tts_text_token_len = self._extract_text_token(tts_text)
|
|
|
embedding = self.spk2info[spk_id]['embedding']
|
|
embedding = self.spk2info[spk_id]['embedding']
|
|
|
|
|
+ assert embedding is not None
|
|
|
model_input = {'text': tts_text_token, 'text_len': tts_text_token_len, 'llm_embedding': embedding, 'flow_embedding': embedding}
|
|
model_input = {'text': tts_text_token, 'text_len': tts_text_token_len, 'llm_embedding': embedding, 'flow_embedding': embedding}
|
|
|
return model_input
|
|
return model_input
|
|
|
|
|
|
|
@@ -209,3 +231,60 @@ class CosyVoiceFrontEnd:
|
|
|
'prompt_speech_feat': prompt_speech_feat, 'prompt_speech_feat_len': prompt_speech_feat_len,
|
|
'prompt_speech_feat': prompt_speech_feat, 'prompt_speech_feat_len': prompt_speech_feat_len,
|
|
|
'flow_embedding': embedding}
|
|
'flow_embedding': embedding}
|
|
|
return model_input
|
|
return model_input
|
|
|
|
|
+
|
|
|
|
|
+ def generate_spk_info(self, spk_id: str, prompt_text: str, prompt_speech_16k: torch.Tensor, resample_rate:int=24000, name: str=None):
|
|
|
|
|
+ assert isinstance(spk_id, str)
|
|
|
|
|
+ assert spk_id not in self.spk2info, "spk_id already exists"
|
|
|
|
|
+ prompt_text_token, _ = self._extract_text_token(prompt_text)
|
|
|
|
|
+ prompt_speech_resample = torchaudio.transforms.Resample(orig_freq=16000, new_freq=resample_rate)(prompt_speech_16k)
|
|
|
|
|
+ speech_feat, _ = self._extract_speech_feat(prompt_speech_resample)
|
|
|
|
|
+ speech_token, speech_token_len = self._extract_speech_token(prompt_speech_16k)
|
|
|
|
|
+ if resample_rate == 24000:
|
|
|
|
|
+ # cosyvoice2, force speech_feat % speech_token = 2
|
|
|
|
|
+ token_len = min(int(speech_feat.shape[1] / 2), speech_token.shape[1])
|
|
|
|
|
+ speech_feat = speech_feat[:, :2 * token_len]
|
|
|
|
|
+ speech_token = speech_token[:, :token_len]
|
|
|
|
|
+ embedding = self._extract_spk_embedding(prompt_speech_16k)
|
|
|
|
|
+ spk_info = SpeakerInfo(
|
|
|
|
|
+ name=name,
|
|
|
|
|
+ spk_id=spk_id,
|
|
|
|
|
+ prompt_text=prompt_text,
|
|
|
|
|
+ prompt_text_token=prompt_text_token,
|
|
|
|
|
+ speech_feat=speech_feat,
|
|
|
|
|
+ speech_token=speech_token,
|
|
|
|
|
+ embedding=embedding,
|
|
|
|
|
+ )
|
|
|
|
|
+ self.add_spk_info(spk_id, spk_info)
|
|
|
|
|
+
|
|
|
|
|
+ def add_spk_info(self, spk_id: str, spk_info: dict|SpeakerInfo):
|
|
|
|
|
+ if isinstance(spk_info, BaseModel):
|
|
|
|
|
+ spk_info = spk_info.model_dump()
|
|
|
|
|
+ self.spk2info[spk_id] = spk_info
|
|
|
|
|
+ if self.spk2info_path:
|
|
|
|
|
+ torch.save(self.spk2info, self.spk2info_path)
|
|
|
|
|
+
|
|
|
|
|
+ def frontend_instruct2_by_spk_id(self, tts_text, instruct_text, spk_id):
|
|
|
|
|
+ assert spk_id in self.spk2info
|
|
|
|
|
+ tts_text_token, _ = self._extract_text_token(tts_text)
|
|
|
|
|
+ prompt_text_token, _ = self._extract_text_token(instruct_text + '<|endofprompt|>')
|
|
|
|
|
+ model_input = {'text': tts_text_token,
|
|
|
|
|
+ 'prompt_text': prompt_text_token,
|
|
|
|
|
+ 'flow_prompt_speech_token': self.spk2info[spk_id]['speech_token'],
|
|
|
|
|
+ 'prompt_speech_feat': self.spk2info[spk_id]['speech_feat'],
|
|
|
|
|
+ 'llm_embedding': self.spk2info[spk_id]['embedding'],
|
|
|
|
|
+ 'flow_embedding': self.spk2info[spk_id]['embedding'],
|
|
|
|
|
+ }
|
|
|
|
|
+ return model_input
|
|
|
|
|
+
|
|
|
|
|
+ def frontend_zero_shot_by_spk_id(self, tts_text, spk_id):
|
|
|
|
|
+ assert spk_id in self.spk2info
|
|
|
|
|
+ tts_text_token, _ = self._extract_text_token(tts_text)
|
|
|
|
|
+ model_input = {'text': tts_text_token,
|
|
|
|
|
+ 'prompt_text': self.spk2info[spk_id]['prompt_text_token'],
|
|
|
|
|
+ 'llm_prompt_speech_token': self.spk2info[spk_id]['speech_token'],
|
|
|
|
|
+ 'flow_prompt_speech_token': self.spk2info[spk_id]['speech_token'],
|
|
|
|
|
+ 'prompt_speech_feat': self.spk2info[spk_id]['speech_feat'],
|
|
|
|
|
+ 'llm_embedding': self.spk2info[spk_id]['embedding'],
|
|
|
|
|
+ 'flow_embedding': self.spk2info[spk_id]['embedding']
|
|
|
|
|
+ }
|
|
|
|
|
+ return model_input
|