| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- # Copyright (c) 2021 Mobvoi Inc. (authors: Binbin Zhang)
- # 2023 Horizon Inc. (authors: Xingchen Song)
- # 2024 Alibaba Inc (authors: Xiang Lyu)
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import os
- import torch
- import json
- import re
- import datetime
- import yaml
- import deepspeed
- import torch.optim as optim
- import torch.distributed as dist
- from torch.utils.tensorboard import SummaryWriter
- from torch.utils.data import DataLoader
- from torch.nn.utils import clip_grad_norm_
- from deepspeed.runtime.zero.stage_1_and_2 import estimate_zero2_model_states_mem_needs_all_live
- from cosyvoice.dataset.dataset import Dataset
- from cosyvoice.utils.scheduler import WarmupLR, NoamHoldAnnealing, ConstantLR
- def init_distributed(args):
- world_size = int(os.environ.get('WORLD_SIZE', 1))
- local_rank = int(os.environ.get('LOCAL_RANK', 0))
- rank = int(os.environ.get('RANK', 0))
- logging.info('training on multiple gpus, this gpu {}'.format(local_rank) +
- ', rank {}, world_size {}'.format(rank, world_size))
- if args.train_engine == 'torch_ddp':
- torch.cuda.set_device(local_rank)
- dist.init_process_group(args.dist_backend)
- else:
- deepspeed.init_distributed(dist_backend=args.dist_backend)
- return world_size, local_rank, rank
- def init_dataset_and_dataloader(args, configs, gan):
- data_pipeline = configs['data_pipeline_gan'] if gan is True else configs['data_pipeline']
- train_dataset = Dataset(args.train_data, data_pipeline=data_pipeline, mode='train', gan=gan, shuffle=True, partition=True)
- cv_dataset = Dataset(args.cv_data, data_pipeline=data_pipeline, mode='train', gan=gan, shuffle=False, partition=False)
- # do not use persistent_workers=True, as whisper tokenizer opens tiktoken file each time when the for loop starts
- train_data_loader = DataLoader(train_dataset,
- batch_size=None,
- pin_memory=args.pin_memory,
- num_workers=args.num_workers,
- prefetch_factor=args.prefetch)
- cv_data_loader = DataLoader(cv_dataset,
- batch_size=None,
- pin_memory=args.pin_memory,
- num_workers=args.num_workers,
- prefetch_factor=args.prefetch)
- return train_dataset, cv_dataset, train_data_loader, cv_data_loader
- def check_modify_and_save_config(args, configs):
- if args.train_engine == "torch_ddp":
- configs['train_conf']["dtype"] = 'fp32'
- else:
- with open(args.deepspeed_config, 'r') as fin:
- ds_configs = json.load(fin)
- if "fp16" in ds_configs and ds_configs["fp16"]["enabled"]:
- configs['train_conf']["dtype"] = "fp16"
- elif "bf16" in ds_configs and ds_configs["bf16"]["enabled"]:
- configs['train_conf']["dtype"] = "bf16"
- else:
- configs['train_conf']["dtype"] = "fp32"
- assert ds_configs["train_micro_batch_size_per_gpu"] == 1
- # if use deepspeed, override ddp config
- configs['train_conf']['save_per_step'] = int(configs['train_conf']['save_per_step'] *
- configs['train_conf']['accum_grad'] / ds_configs["gradient_accumulation_steps"])
- configs['train_conf']['accum_grad'] = ds_configs["gradient_accumulation_steps"]
- configs['train_conf']['grad_clip'] = ds_configs["gradient_clipping"]
- configs['train_conf']['log_interval'] = ds_configs["steps_per_print"]
- return configs
- def wrap_cuda_model(args, model):
- local_world_size = int(os.environ.get('LOCAL_WORLD_SIZE', 1))
- world_size = int(os.environ.get('WORLD_SIZE', 1))
- if args.train_engine == "torch_ddp": # native pytorch ddp
- assert (torch.cuda.is_available())
- model.cuda()
- model = torch.nn.parallel.DistributedDataParallel(model, find_unused_parameters=True)
- else:
- if int(os.environ.get('RANK', 0)) == 0:
- logging.info("Estimating model states memory needs (zero2)...")
- estimate_zero2_model_states_mem_needs_all_live(
- model,
- num_gpus_per_node=local_world_size,
- num_nodes=world_size // local_world_size)
- return model
- def init_optimizer_and_scheduler(args, configs, model, gan):
- if gan is False:
- if configs['train_conf']['optim'] == 'adam':
- optimizer = optim.Adam(model.parameters(), **configs['train_conf']['optim_conf'])
- elif configs['train_conf']['optim'] == 'adamw':
- optimizer = optim.AdamW(model.parameters(), **configs['train_conf']['optim_conf'])
- else:
- raise ValueError("unknown optimizer: " + configs['train_conf'])
- if configs['train_conf']['scheduler'] == 'warmuplr':
- scheduler_type = WarmupLR
- scheduler = WarmupLR(optimizer, **configs['train_conf']['scheduler_conf'])
- elif configs['train_conf']['scheduler'] == 'NoamHoldAnnealing':
- scheduler_type = NoamHoldAnnealing
- scheduler = NoamHoldAnnealing(optimizer, **configs['train_conf']['scheduler_conf'])
- elif configs['train_conf']['scheduler'] == 'constantlr':
- scheduler_type = ConstantLR
- scheduler = ConstantLR(optimizer)
- else:
- raise ValueError("unknown scheduler: " + configs['train_conf'])
- # use deepspeed optimizer for speedup
- if args.train_engine == "deepspeed":
- def scheduler(opt):
- return scheduler_type(opt, **configs['train_conf']['scheduler_conf'])
- model, optimizer, _, scheduler = deepspeed.initialize(
- args=args,
- model=model,
- optimizer=None,
- lr_scheduler=scheduler,
- model_parameters=model.parameters())
- optimizer_d, scheduler_d = None, None
- else:
- # currently we wrap generator and discriminator in one model, so we cannot use deepspeed
- if configs['train_conf']['optim'] == 'adam':
- optimizer = optim.Adam(model.module.generator.parameters(), **configs['train_conf']['optim_conf'])
- elif configs['train_conf']['optim'] == 'adamw':
- optimizer = optim.AdamW(model.module.generator.parameters(), **configs['train_conf']['optim_conf'])
- else:
- raise ValueError("unknown optimizer: " + configs['train_conf'])
- if configs['train_conf']['scheduler'] == 'warmuplr':
- scheduler_type = WarmupLR
- scheduler = WarmupLR(optimizer, **configs['train_conf']['scheduler_conf'])
- elif configs['train_conf']['scheduler'] == 'NoamHoldAnnealing':
- scheduler_type = NoamHoldAnnealing
- scheduler = NoamHoldAnnealing(optimizer, **configs['train_conf']['scheduler_conf'])
- elif configs['train_conf']['scheduler'] == 'constantlr':
- scheduler_type = ConstantLR
- scheduler = ConstantLR(optimizer)
- else:
- raise ValueError("unknown scheduler: " + configs['train_conf'])
- if configs['train_conf']['optim_d'] == 'adam':
- optimizer_d = optim.Adam(model.module.discriminator.parameters(), **configs['train_conf']['optim_conf'])
- elif configs['train_conf']['optim_d'] == 'adamw':
- optimizer_d = optim.AdamW(model.module.discriminator.parameters(), **configs['train_conf']['optim_conf'])
- else:
- raise ValueError("unknown optimizer: " + configs['train_conf'])
- if configs['train_conf']['scheduler_d'] == 'warmuplr':
- scheduler_type = WarmupLR
- scheduler_d = WarmupLR(optimizer_d, **configs['train_conf']['scheduler_conf'])
- elif configs['train_conf']['scheduler_d'] == 'NoamHoldAnnealing':
- scheduler_type = NoamHoldAnnealing
- scheduler_d = NoamHoldAnnealing(optimizer_d, **configs['train_conf']['scheduler_conf'])
- elif configs['train_conf']['scheduler'] == 'constantlr':
- scheduler_type = ConstantLR
- scheduler_d = ConstantLR(optimizer_d)
- else:
- raise ValueError("unknown scheduler: " + configs['train_conf'])
- return model, optimizer, scheduler, optimizer_d, scheduler_d
- def init_summarywriter(args):
- writer = None
- if int(os.environ.get('RANK', 0)) == 0:
- os.makedirs(args.model_dir, exist_ok=True)
- writer = SummaryWriter(args.tensorboard_dir)
- return writer
- def save_model(model, model_name, info_dict):
- rank = int(os.environ.get('RANK', 0))
- model_dir = info_dict["model_dir"]
- save_model_path = os.path.join(model_dir, '{}.pt'.format(model_name))
- if info_dict["train_engine"] == "torch_ddp":
- if rank == 0:
- torch.save({**model.module.state_dict(), 'epoch': info_dict['epoch'], 'step': info_dict['step']}, save_model_path)
- else:
- with torch.no_grad():
- model.save_checkpoint(save_dir=model_dir,
- tag=model_name,
- client_state=info_dict)
- if rank == 0:
- info_path = re.sub('.pt$', '.yaml', save_model_path)
- info_dict['save_time'] = datetime.datetime.now().strftime('%d/%m/%Y %H:%M:%S')
- with open(info_path, 'w') as fout:
- data = yaml.dump(info_dict)
- fout.write(data)
- logging.info('[Rank {}] Checkpoint: save to checkpoint {}'.format(rank, save_model_path))
- def cosyvoice_join(group_join, info_dict):
- world_size = int(os.environ.get('WORLD_SIZE', 1))
- local_rank = int(os.environ.get('LOCAL_RANK', 0))
- rank = int(os.environ.get('RANK', 0))
- if info_dict["batch_idx"] != 0:
- # we try to join all rank in both ddp and deepspeed mode, in case different rank has different lr
- try:
- dist.monitored_barrier(group=group_join,
- timeout=group_join.options._timeout)
- return False
- except RuntimeError as e:
- logging.info("Detected uneven workload distribution: {}\n".format(e) +
- "Break current worker to manually join all workers, " +
- "world_size {}, current rank {}, current local_rank {}\n".
- format(world_size, rank, local_rank))
- return True
- else:
- return False
- def batch_forward(model, batch, scaler, info_dict, ref_model=None, dpo_loss=None):
- device = int(os.environ.get('LOCAL_RANK', 0))
- dtype = info_dict["dtype"]
- if dtype == "fp16":
- dtype = torch.float16
- elif dtype == "bf16":
- dtype = torch.bfloat16
- else: # fp32
- dtype = torch.float32
- if info_dict['train_engine'] == 'torch_ddp':
- autocast = torch.cuda.amp.autocast(enabled=scaler is not None)
- else:
- autocast = torch.cuda.amp.autocast(enabled=True, dtype=dtype, cache_enabled=False)
- with autocast:
- info_dict['loss_dict'] = model(batch, device)
- if ref_model and dpo_loss:
- chosen_logps = info_dict['loss_dict']["chosen_logps"]
- rejected_logps = info_dict['loss_dict']["rejected_logps"]
- sft_loss = info_dict['loss_dict']['loss']
- with torch.no_grad():
- ref_model = ref_model.to(device)
- ref_loss_dict = ref_model(batch, device)
- reference_chosen_logps = ref_loss_dict["chosen_logps"]
- reference_rejected_logps = ref_loss_dict["rejected_logps"]
- preference_loss, chosen_reward, reject_reward = dpo_loss(
- chosen_logps, rejected_logps, reference_chosen_logps, reference_rejected_logps
- )
- dpo_acc = (chosen_reward > reject_reward).float().mean()
- info_dict['loss_dict']["loss"] = preference_loss + sft_loss
- info_dict['loss_dict']["sft_loss"] = sft_loss
- info_dict['loss_dict']["dpo_loss"] = preference_loss
- info_dict['loss_dict']["dpo_acc"] = dpo_acc
- info_dict['loss_dict']["chosen_reward"] = chosen_reward.mean()
- info_dict['loss_dict']["reject_reward"] = reject_reward.mean()
- return info_dict
- def batch_backward(model, scaler, info_dict):
- if info_dict["train_engine"] == "deepspeed":
- scaled_loss = model.backward(info_dict['loss_dict']['loss'])
- else:
- scaled_loss = info_dict['loss_dict']['loss'] / info_dict['accum_grad']
- if scaler is not None:
- scaler.scale(scaled_loss).backward()
- else:
- scaled_loss.backward()
- info_dict['loss_dict']['loss'] = scaled_loss
- return info_dict
- def update_parameter_and_lr(model, optimizer, scheduler, scaler, info_dict):
- grad_norm = 0.0
- if info_dict['train_engine'] == "deepspeed":
- info_dict["is_gradient_accumulation_boundary"] = model.is_gradient_accumulation_boundary()
- model.step()
- grad_norm = model.get_global_grad_norm()
- elif (info_dict['batch_idx'] + 1) % info_dict["accum_grad"] == 0:
- # Use mixed precision training
- if scaler is not None:
- scaler.unscale_(optimizer)
- grad_norm = clip_grad_norm_(model.parameters(), info_dict['grad_clip'])
- # We don't check grad here since that if the gradient
- # has inf/nan values, scaler.step will skip
- # optimizer.step().
- if torch.isfinite(grad_norm):
- scaler.step(optimizer)
- scaler.update()
- else:
- grad_norm = clip_grad_norm_(model.parameters(), info_dict['grad_clip'])
- if torch.isfinite(grad_norm):
- optimizer.step()
- optimizer.zero_grad()
- scheduler.step()
- info_dict["lr"] = optimizer.param_groups[0]['lr']
- info_dict["grad_norm"] = grad_norm
- return info_dict
- def log_per_step(writer, info_dict):
- tag = info_dict["tag"]
- epoch = info_dict.get('epoch', 0)
- step = info_dict["step"]
- batch_idx = info_dict["batch_idx"]
- loss_dict = info_dict['loss_dict']
- rank = int(os.environ.get('RANK', 0))
- # only rank 0 write to tensorboard to avoid multi-process write
- if writer is not None:
- if (info_dict['train_engine'] == 'deepspeed' and info_dict['is_gradient_accumulation_boundary'] is True) or \
- (info_dict['train_engine'] == 'torch_ddp' and (info_dict['batch_idx'] + 1) % info_dict['accum_grad'] == 0):
- for k in ['epoch', 'lr', 'grad_norm']:
- writer.add_scalar('{}/{}'.format(tag, k), info_dict[k], step + 1)
- for k, v in loss_dict.items():
- writer.add_scalar('{}/{}'.format(tag, k), v, step + 1)
- # TRAIN & CV, Shell log (stdout)
- if (info_dict['batch_idx'] + 1) % info_dict['log_interval'] == 0:
- log_str = '{} Batch {}/{} '.format(tag, epoch, batch_idx + 1)
- for name, value in loss_dict.items():
- log_str += '{} {:.6f} '.format(name, value)
- if tag == "TRAIN":
- log_str += 'lr {:.8f} grad_norm {:.6f}'.format(
- info_dict["lr"], info_dict['grad_norm'])
- log_str += ' rank {}'.format(rank)
- logging.debug(log_str)
- def log_per_save(writer, info_dict):
- tag = info_dict["tag"]
- epoch = info_dict["epoch"]
- step = info_dict["step"]
- loss_dict = info_dict["loss_dict"]
- lr = info_dict['lr']
- rank = int(os.environ.get('RANK', 0))
- logging.info(
- 'Epoch {} Step {} CV info lr {} {} rank {}'.format(
- epoch, step + 1, lr, rank, ' '.join(['{}_{}'.format(k, v) for k, v in loss_dict.items()])))
- if writer is not None:
- for k in ['epoch', 'lr']:
- writer.add_scalar('{}/{}'.format(tag, k), info_dict[k], step + 1)
- for k, v in loss_dict.items():
- writer.add_scalar('{}/{}'.format(tag, k), v, step + 1)
|