diff --git a/examples/aishell/s0/run_npu.sh b/examples/aishell/s0/run_npu.sh new file mode 100644 index 0000000000..a2f05a411a --- /dev/null +++ b/examples/aishell/s0/run_npu.sh @@ -0,0 +1,216 @@ +#!/bin/bash + +# Copyright 2019 Mobvoi Inc. All Rights Reserved. +. ./path.sh || exit 1; + +# Automatically detect number of npus +if command -v npu-smi info &> /dev/null; then + num_npus=$(npu-smi info -l | grep "Total Count" | awk '{print $4}') + npu_list=$(seq -s, 0 $((num_npus-1))) +else + num_npus=-1 + npu_list="-1" +fi + +# You can also manually specify NPU_VISIBLE_DEVICES +# if you don't want to utilize all available NPU resources. +export NPU_VISIBLE_DEVICES="${npu_list}" +echo "NPU_VISIBLE_DEVICES is ${NPU_VISIBLE_DEVICES}" + +stage=4 # start from 0 if you need to start from data preparation +stop_stage=4 + +# You should change the following two parameters for multiple machine training, +# see https://pytorch.org/docs/stable/elastic/run.html +HOST_NODE_ADDR="localhost:0" +num_nodes=1 +job_id=2023 + +# The aishell dataset location, please change this to your own path +# make sure of using absolute path. DO-NOT-USE relatvie path! +data=/export/data/asr-data/OpenSLR/33/ +data_url=www.openslr.org/resources/33 + +nj=16 +dict=data/dict/lang_char.txt + +# data_type can be `raw` or `shard`. Typically, raw is used for small dataset, +# `shard` is used for large dataset which is over 1k hours, and `shard` is +# faster on reading data and training. +data_type=raw +num_utts_per_shard=1000 + +train_set=train +# Optional train_config +# 1. conf/train_transformer.yaml: Standard transformer +# 2. conf/train_conformer.yaml: Standard conformer +# 3. conf/train_unified_conformer.yaml: Unified dynamic chunk causal conformer +# 4. conf/train_unified_transformer.yaml: Unified dynamic chunk transformer +# 5. conf/train_u2++_conformer.yaml: U2++ conformer +# 6. conf/train_u2++_transformer.yaml: U2++ transformer +# 7. conf/train_u2++_conformer.yaml: U2++ lite conformer, must load a well +# trained model, and freeze encoder module, otherwise there will be a +# autograd error +train_config=conf/train_conformer.yaml +dir=exp/conformer +tensorboard_dir=tensorboard +checkpoint= +num_workers=8 +prefetch=10 + +# use average_checkpoint will get better result +average_checkpoint=true +decode_checkpoint=$dir/final.pt +average_num=30 +decode_modes="ctc_greedy_search ctc_prefix_beam_search attention attention_rescoring" + +# specify your distributed training method among ['torch_ddp', 'torch_fsdp', 'deepspeed'] +train_engine=torch_fsdp + +deepspeed_config=conf/ds_stage2.json +deepspeed_save_states="model_only" + +# Syntax error: Bad for loop variable +. tools/parse_options.sh || exit 1; + +if [ ${stage} -le -1 ] && [ ${stop_stage} -ge -1 ]; then + echo "stage -1: Data Download" + local/download_and_untar.sh ${data} ${data_url} data_aishell + local/download_and_untar.sh ${data} ${data_url} resource_aishell +fi + +if [ ${stage} -le 0 ] && [ ${stop_stage} -ge 0 ]; then + # Data preparation + local/aishell_data_prep.sh ${data}/data_aishell/wav \ + ${data}/data_aishell/transcript +fi + + +if [ ${stage} -le 1 ] && [ ${stop_stage} -ge 1 ]; then + # remove the space between the text labels for Mandarin dataset + for x in train dev test; do + cp data/${x}/text data/${x}/text.org + paste -d " " <(cut -f 1 -d" " data/${x}/text.org) \ + <(cut -f 2- -d" " data/${x}/text.org | tr -d " ") \ + > data/${x}/text + rm data/${x}/text.org + done + + tools/compute_cmvn_stats.py --num_workers 16 --train_config $train_config \ + --in_scp data/${train_set}/wav.scp \ + --out_cmvn data/$train_set/global_cmvn +fi + +if [ ${stage} -le 2 ] && [ ${stop_stage} -ge 2 ]; then + echo "Make a dictionary" + mkdir -p $(dirname $dict) + echo " 0" > ${dict} # 0 is for "blank" in CTC + echo " 1" >> ${dict} # must be 1 + echo " 2" >> $dict + tools/text2token.py -s 1 -n 1 data/train/text | cut -f 2- -d" " \ + | tr " " "\n" | sort | uniq | grep -a -v -e '^\s*$' | \ + awk '{print $0 " " NR+2}' >> ${dict} +fi + +if [ ${stage} -le 3 ] && [ ${stop_stage} -ge 3 ]; then + echo "Prepare data, prepare required format" + for x in dev test ${train_set}; do + if [ $data_type == "shard" ]; then + tools/make_shard_list.py --num_utts_per_shard $num_utts_per_shard \ + --num_threads 16 data/$x/wav.scp data/$x/text \ + $(realpath data/$x/shards) data/$x/data.list + else + tools/make_raw_list.py data/$x/wav.scp data/$x/text \ + data/$x/data.list + fi + done +fi + +if [ ${stage} -le 4 ] && [ ${stop_stage} -ge 4 ]; then + mkdir -p $dir + num_npus=$(echo $NPU_VISIBLE_DEVICES | awk -F "," '{print NF}') + # Use "hccl" for npu if it works, otherwise use "gloo" + # NOTE(xcsong): deepspeed fails with gloo, see + # https://github.com/microsoft/DeepSpeed/issues/2818 + dist_backend="hccl" + + # train.py rewrite $train_config to $dir/train.yaml with model input + # and output dimension, and $dir/train.yaml will be used for inference + # and export. + echo "$0: using ${train_engine}" + + # NOTE(xcsong): Both ddp & deepspeed can be launched by torchrun + # NOTE(xcsong): To unify single-node & multi-node training, we add + # all related args. You should change `nnodes` & + # `rdzv_endpoint` for multi-node, see + # https://pytorch.org/docs/stable/elastic/run.html#usage + # https://github.com/wenet-e2e/wenet/pull/2055#issuecomment-1766055406 + # `rdzv_id` - A user-defined id that uniquely identifies the worker group for a job. + # This id is used by each node to join as a member of a particular worker group. + # `rdzv_endpoint` - The rendezvous backend endpoint; usually in form :. + echo "$0: num_nodes is $num_nodes, proc_per_node is $num_npus" + torchrun --nnodes=$num_nodes --nproc_per_node=$num_npus \ + --rdzv_id=$job_id --rdzv_backend="c10d" --rdzv_endpoint=$HOST_NODE_ADDR \ + wenet/bin/train.py \ + --device "npu" \ + --train_engine ${train_engine} \ + --config $train_config \ + --data_type $data_type \ + --train_data data/$train_set/data.list \ + --cv_data data/dev/data.list \ + ${checkpoint:+--checkpoint $checkpoint} \ + --model_dir $dir \ + --tensorboard_dir ${tensorboard_dir} \ + --ddp.dist_backend $dist_backend \ + --num_workers ${num_workers} \ + --prefetch ${prefetch} \ + --pin_memory \ + --deepspeed_config ${deepspeed_config} \ + --deepspeed.save_states ${deepspeed_save_states} +fi + +if [ ${stage} -le 5 ] && [ ${stop_stage} -ge 5 ]; then + # Test model, please specify the model you want to test by --checkpoint + if [ ${average_checkpoint} == true ]; then + decode_checkpoint=$dir/avg_${average_num}.pt + echo "do model average and final checkpoint is $decode_checkpoint" + python wenet/bin/average_model.py \ + --dst_model $decode_checkpoint \ + --src_path $dir \ + --num ${average_num} \ + --val_best + fi + # Please specify decoding_chunk_size for unified streaming and + # non-streaming model. The default value is -1, which is full chunk + # for non-streaming inference. + decoding_chunk_size= + ctc_weight=0.3 + reverse_weight=0.5 + python wenet/bin/recognize.py \ + --device "npu" \ + --modes $decode_modes \ + --config $dir/train.yaml \ + --data_type $data_type \ + --test_data data/test/data.list \ + --checkpoint $decode_checkpoint \ + --beam_size 10 \ + --batch_size 32 \ + --ctc_weight $ctc_weight \ + --reverse_weight $reverse_weight \ + --result_dir $dir \ + ${decoding_chunk_size:+--decoding_chunk_size $decoding_chunk_size} + for mode in ${decode_modes}; do + python tools/compute-wer.py --char=1 --v=1 \ + data/test/text $dir/$mode/text > $dir/$mode/wer + done +fi + + +if [ ${stage} -le 6 ] && [ ${stop_stage} -ge 6 ]; then + # Export the best model you want + python wenet/bin/export_jit.py \ + --config $dir/train.yaml \ + --checkpoint $dir/avg_${average_num}.pt \ + --output_file $dir/final.zip \ + --output_quant_file $dir/final_quant.zip +fi diff --git a/examples/aishell/whisper/run_npu.sh b/examples/aishell/whisper/run_npu.sh new file mode 100644 index 0000000000..eda596c786 --- /dev/null +++ b/examples/aishell/whisper/run_npu.sh @@ -0,0 +1,171 @@ +#!/bin/bash + +# Copyright 2019 Mobvoi Inc. All Rights Reserved. +. ./path.sh || exit 1; + +# Automatically detect number of npus +if command -v npu-smi info &> /dev/null; then + num_npus=$(npu-smi info -l | grep "Total Count" | awk '{print $4}') + npu_list=$(seq -s, 0 $((num_npus-1))) +else + num_npus=-1 + npu_list="-1" +fi + +# You can also manually specify NPU_VISIBLE_DEVICES +# if you don't want to utilize all available NPU resources. +export NPU_VISIBLE_DEVICES="${npu_list}" +echo "NPU_VISIBLE_DEVICES is ${NPU_VISIBLE_DEVICES}" + +stage=0 +stop_stage=0 + +# You should change the following two parameters for multiple machine training, +# see https://pytorch.org/docs/stable/elastic/run.html +HOST_NODE_ADDR="localhost:0" +num_nodes=1 +job_id=2023 + +# data_type can be `raw` or `shard`. Typically, raw is used for small dataset, +# `shard` is used for large dataset which is over 1k hours, and `shard` is +# faster on reading data and training. +data_type=raw + +train_set=train +# Optional train_config +# 1. Standard whisper largev3 +# train_config=conf/finetune_whisper_largev3.yaml +# checkpoint=exp/whisper/large-v3/wenet_whisper.init-ctc.pt +# 2. Whisper largev3 with randomly init conv2d4 +# train_config=conf/finetune_whisper_largev3_conv2d4.yaml +# checkpoint=exp/whisper/large-v3/wenet_whisper.remove-subsample.init-ctc.pt +train_config=conf/finetune_whisper_largev3_conv2d4.yaml +checkpoint=exp/whisper/large-v3/wenet_whisper.remove-subsample.init-ctc.pt +dir=exp/finetune_whisper_largev3_conv1d2 +tensorboard_dir=tensorboard +num_workers=8 +prefetch=10 + +# use average_checkpoint will get better result +average_checkpoint=true +decode_checkpoint=$dir/final.pt +average_num=5 +decode_modes="ctc_greedy_search ctc_prefix_beam_search attention attention_rescoring" +decode_device=0 +decoding_chunk_size=-1 +ctc_weight=0.3 +reverse_weight=0.0 +decode_batch=4 + +train_engine=deepspeed + +# model+optimizer or model_only, model+optimizer is more time-efficient but +# consumes more space, while model_only is the opposite +deepspeed_config=conf/ds_stage1.json +deepspeed_save_states="model+optimizer" + +. tools/parse_options.sh || exit 1; + +if [ ${stage} -le 0 ] && [ ${stop_stage} -ge 0 ]; then + mkdir -p $dir + num_npus=$(echo $NPU_VISIBLE_DEVICES | awk -F "," '{print NF}') + # Use "nccl" if it works, otherwise use "gloo" + # NOTE(xcsong): deepspeed fails with gloo, see + # https://github.com/microsoft/DeepSpeed/issues/2818 + dist_backend="hccl" + + # train.py rewrite $train_config to $dir/train.yaml with model input + # and output dimension, and $dir/train.yaml will be used for inference + # and export. + echo "$0: using ${train_engine}" + + # NOTE(xcsong): Both ddp & deepspeed can be launched by torchrun + # NOTE(xcsong): To unify single-node & multi-node training, we add + # all related args. You should change `nnodes` & + # `rdzv_endpoint` for multi-node, see + # https://pytorch.org/docs/stable/elastic/run.html#usage + # https://github.com/wenet-e2e/wenet/pull/2055#issuecomment-1766055406 + # `rdzv_id` - A user-defined id that uniquely identifies the worker group for a job. + # This id is used by each node to join as a member of a particular worker group. + # `rdzv_endpoint` - The rendezvous backend endpoint; usually in form :. + echo "$0: num_nodes is $num_nodes, proc_per_node is $num_npus" + torchrun --nnodes=$num_nodes --nproc_per_node=$num_npus \ + --rdzv_id=$job_id --rdzv_backend="c10d" --rdzv_endpoint=$HOST_NODE_ADDR \ + wenet/bin/train.py \ + --device "npu" \ + --train_engine ${train_engine} \ + --config $train_config \ + --data_type $data_type \ + --train_data data/$train_set/data.list \ + --cv_data data/dev/data.list \ + ${checkpoint:+--checkpoint $checkpoint} \ + --model_dir $dir \ + --tensorboard_dir ${tensorboard_dir} \ + --ddp.dist_backend $dist_backend \ + --num_workers ${num_workers} \ + --prefetch ${prefetch} \ + --pin_memory \ + --deepspeed_config ${deepspeed_config} \ + --deepspeed.save_states ${deepspeed_save_states} +fi + + +if [ ${stage} -le 1 ] && [ ${stop_stage} -ge 1 ]; then + if [ "$deepspeed_save_states" = "model+optimizer" ]; then + for subdir in $(find "$dir" -maxdepth 1 -type d | grep -v "^$dir$") + do + # NOTE(xcsong): zero_to_fp32.py is automatically generated by deepspeed + tag=$(basename "$subdir") + echo "$tag" + python3 ${dir}/zero_to_fp32.py \ + ${dir} ${dir}/${tag}.pt -t ${tag} + rm -rf ${dir}/${tag} + done + fi +fi + +if [ ${stage} -le 2 ] && [ ${stop_stage} -ge 2 ]; then + # Test model, please specify the model you want to test by --checkpoint + if [ ${average_checkpoint} == true ]; then + decode_checkpoint=$dir/avg_${average_num}.pt + echo "do model average and final checkpoint is $decode_checkpoint" + python wenet/bin/average_model.py \ + --dst_model $decode_checkpoint \ + --src_path $dir \ + --num ${average_num} \ + --val_best + fi + # Please specify decoding_chunk_size for unified streaming and + # non-streaming model. The default value is -1, which is full chunk + # for non-streaming inference. + base=$(basename $decode_checkpoint) + result_dir=$dir/${base}_chunk${decoding_chunk_size}_ctc${ctc_weight}_reverse${reverse_weight} + mkdir -p ${result_dir} + python wenet/bin/recognize.py --device "npu" \ + --modes $decode_modes \ + --config $dir/train.yaml \ + --data_type $data_type \ + --test_data data/test/data.list \ + --checkpoint $decode_checkpoint \ + --beam_size 10 \ + --batch_size ${decode_batch} \ + --blank_penalty 0.0 \ + --ctc_weight $ctc_weight \ + --reverse_weight $reverse_weight \ + --result_dir $result_dir \ + ${decoding_chunk_size:+--decoding_chunk_size $decoding_chunk_size} + for mode in ${decode_modes}; do + python tools/compute-wer.py --char=1 --v=1 \ + data/test/text $result_dir/$mode/text > $result_dir/$mode/wer + done +fi + + +if [ ${stage} -le 3 ] && [ ${stop_stage} -ge 3 ]; then + # Export the best model you want + python wenet/bin/export_jit.py \ + --config $dir/train.yaml \ + --checkpoint $dir/avg_${average_num}.pt \ + --output_file $dir/final.zip \ + --output_quant_file $dir/final_quant.zip +fi diff --git a/wenet/bin/alignment.py b/wenet/bin/alignment.py index 2bfda7bcba..12c272a2bd 100644 --- a/wenet/bin/alignment.py +++ b/wenet/bin/alignment.py @@ -136,6 +136,11 @@ def get_labformat(timestamp, subsample): type=int, default=-1, help='gpu id for this rank, -1 for cpu') + parser.add_argument('--device', + type=str, + default="cpu", + choices=["cpu", "npu", "cuda"], + help='accelerator to use') parser.add_argument('--blank_thres', default=0.999999, type=float, @@ -165,7 +170,11 @@ def get_labformat(timestamp, subsample): print(args) logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s') - os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu) + if args.gpu != -1: + # remain the original usage of gpu + args.device = "cuda" + if "cuda" in args.device: + os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu) if args.batch_size > 1: logging.fatal('alignment mode must be running with batch_size == 1') @@ -213,8 +222,7 @@ def get_labformat(timestamp, subsample): # Init asr model from configs model, configs = init_model(args, configs) - use_cuda = args.gpu >= 0 and torch.cuda.is_available() - device = torch.device('cuda' if use_cuda else 'cpu') + device = torch.device(args.device) model = model.to(device) model.eval() diff --git a/wenet/bin/recognize.py b/wenet/bin/recognize.py index 3779b74eca..3101d6eb38 100644 --- a/wenet/bin/recognize.py +++ b/wenet/bin/recognize.py @@ -29,6 +29,7 @@ from wenet.utils.init_tokenizer import init_tokenizer from wenet.utils.context_graph import ContextGraph from wenet.utils.ctc_utils import get_blank_id +from wenet.utils.common import TORCH_NPU_AVAILABLE # noqa just ensure to check torch-npu def get_args(): @@ -43,6 +44,11 @@ def get_args(): type=int, default=-1, help='gpu id for this rank, -1 for cpu') + parser.add_argument('--device', + type=str, + default="cpu", + choices=["cpu", "npu", "cuda"], + help='accelerator to use') parser.add_argument('--dtype', type=str, default='fp32', @@ -185,7 +191,11 @@ def main(): args = get_args() logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s %(message)s') - os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu) + if args.gpu != -1: + # remain the original usage of gpu + args.device = "cuda" + if "cuda" in args.device: + os.environ['CUDA_VISIBLE_DEVICES'] = str(args.gpu) with open(args.config, 'r') as fin: configs = yaml.load(fin, Loader=yaml.FullLoader) @@ -230,8 +240,7 @@ def main(): args.jit = False model, configs = init_model(args, configs) - use_cuda = args.gpu >= 0 and torch.cuda.is_available() - device = torch.device('cuda' if use_cuda else 'cpu') + device = torch.device(args.device) model = model.to(device) model.eval() dtype = torch.float32 diff --git a/wenet/bin/train.py b/wenet/bin/train.py index 60d6374f4e..a77021c671 100644 --- a/wenet/bin/train.py +++ b/wenet/bin/train.py @@ -24,7 +24,7 @@ import torch.distributed as dist from torch.distributed.elastic.multiprocessing.errors import record -from wenet.utils.common import lrs_to_str +from wenet.utils.common import lrs_to_str, TORCH_NPU_AVAILABLE # noqa just ensure to check torch-npu from wenet.utils.executor import Executor from wenet.utils.config import override_config @@ -45,6 +45,12 @@ def get_args(): default='torch_ddp', choices=['torch_ddp', 'torch_fsdp', 'deepspeed'], help='Engine for paralleled training') + # set default value of device to "cuda", avoiding the modify of original scripts + parser.add_argument('--device', + type=str, + default='cuda', + choices=["cpu", "npu", "cuda"], + help='accelerator for training') parser = add_model_args(parser) parser = add_dataset_args(parser) parser = add_ddp_args(parser) @@ -118,7 +124,8 @@ def main(): # Get executor tag = configs["init_infos"].get("tag", "init") - executor = Executor(global_step=configs["init_infos"].get('step', -1)) + executor = Executor(global_step=configs["init_infos"].get('step', -1), + device=device) # Init scaler, used for pytorch amp mixed precision training scaler = init_scaler(args) diff --git a/wenet/cli/model.py b/wenet/cli/model.py index 182bfc5f63..ef4a301ec2 100644 --- a/wenet/cli/model.py +++ b/wenet/cli/model.py @@ -25,6 +25,7 @@ from wenet.transformer.search import (attention_rescoring, ctc_prefix_beam_search, DecodeResult) from wenet.utils.context_graph import ContextGraph +from wenet.utils.common import TORCH_NPU_AVAILABLE # noqa just ensure to check torch-npu class Model: @@ -46,7 +47,6 @@ def __init__(self, else: device = 'cpu' self.device = torch.device(device) - self.model = self.model.to(self.device) self.symbol_table = read_symbol_table(units_path) self.char_dict = {v: k for k, v in self.symbol_table.items()} self.beam = beam @@ -63,13 +63,19 @@ def compute_feats(self, audio_file: str) -> torch.Tensor: if sample_rate != self.resample_rate: waveform = torchaudio.transforms.Resample( orig_freq=sample_rate, new_freq=self.resample_rate)(waveform) - waveform = waveform.to(self.device) + # NOTE (MengqingCao): complex dtype not supported in torch_npu.abs() now, + # thus, delay placing data on NPU after the calculation of fbank. + # revert me after complex dtype is supported. + if "npu" not in self.device.__str__(): + waveform = waveform.to(self.device) feats = kaldi.fbank(waveform, num_mel_bins=80, frame_length=25, frame_shift=10, energy_floor=0.0, sample_frequency=self.resample_rate) + if "npu" in self.device.__str__(): + feats = feats.to(self.device) feats = feats.unsqueeze(0) return feats @@ -155,7 +161,15 @@ def load_model(language: str = None, gpu: int = -1, beam: int = 5, context_path: str = None, - context_score: float = 6.0) -> Model: + context_score: float = 6.0, + device: str = "cpu") -> Model: if model_dir is None: model_dir = Hub.get_model_by_lang(language) - return Model(model_dir, gpu, beam, context_path, context_score) + + if gpu != -1: + # remain the original usage of gpu + device = "cuda" + model = Model(model_dir, beam, context_path, context_score) + model.device = torch.device(device) + model.model.to(device) + return model diff --git a/wenet/cli/paraformer_model.py b/wenet/cli/paraformer_model.py index a43814a3af..4f77758f2d 100644 --- a/wenet/cli/paraformer_model.py +++ b/wenet/cli/paraformer_model.py @@ -8,25 +8,18 @@ from wenet.paraformer.search import (gen_timestamps_from_peak, paraformer_greedy_search) from wenet.text.paraformer_tokenizer import ParaformerTokenizer +from wenet.utils.common import TORCH_NPU_AVAILABLE # noqa just ensure to check torch-npu class Paraformer: - def __init__(self, - model_dir: str, - device: int = -1, - resample_rate: int = 16000) -> None: + def __init__(self, model_dir: str, resample_rate: int = 16000) -> None: model_path = os.path.join(model_dir, 'final.zip') units_path = os.path.join(model_dir, 'units.txt') self.model = torch.jit.load(model_path) self.resample_rate = resample_rate - if device >= 0: - device = 'cuda:{}'.format(device) - else: - device = 'cpu' - self.device = torch.device(device) - self.model = self.model.to(self.device) + self.device = torch.device("cpu") self.tokenizer = ParaformerTokenizer(symbol_table=units_path) def transcribe(self, audio_file: str, tokens_info: bool = False) -> dict: @@ -75,7 +68,15 @@ def align(self, audio_file: str, label: str) -> dict: raise NotImplementedError("Align is currently not supported") -def load_model(model_dir: str = None, gpu: int = -1) -> Paraformer: +def load_model(model_dir: str = None, + gpu: int = -1, + device: str = "cpu") -> Paraformer: if model_dir is None: model_dir = Hub.get_model_by_lang('paraformer') - return Paraformer(model_dir, gpu) + if gpu != -1: + # remain the original usage of gpu + device = "cuda" + paraformer = Paraformer(model_dir) + paraformer.device = torch.device(device) + paraformer.model.to(device) + return paraformer diff --git a/wenet/cli/transcribe.py b/wenet/cli/transcribe.py index 7fe6a1c473..28bf279192 100644 --- a/wenet/cli/transcribe.py +++ b/wenet/cli/transcribe.py @@ -38,6 +38,11 @@ def get_args(): type=int, default='-1', help='gpu id to decode, default is cpu.') + parser.add_argument('--device', + type=str, + default='cpu', + choices=["cpu", "npu", "cuda"], + help='accelerator to use') parser.add_argument('-t', '--show_tokens_info', action='store_true', @@ -67,10 +72,10 @@ def main(): args = get_args() if args.paraformer: - model = load_paraformer(args.model_dir, args.gpu) + model = load_paraformer(args.model_dir, args.gpu, args.device) else: model = load_model(args.language, args.model_dir, args.gpu, args.beam, - args.context_path, args.context_score) + args.context_path, args.context_score, args.device) if args.align: result = model.align(args.audio_file, args.label) else: diff --git a/wenet/k2/model.py b/wenet/k2/model.py index d76d89dd78..2baabbfb2b 100644 --- a/wenet/k2/model.py +++ b/wenet/k2/model.py @@ -27,18 +27,19 @@ class K2Model(ASRModel): def __init__( - self, - vocab_size: int, - encoder: TransformerEncoder, - decoder: TransformerDecoder, - ctc: CTC, - ctc_weight: float = 0.5, - ignore_id: int = IGNORE_ID, - reverse_weight: float = 0.0, - lsm_weight: float = 0.0, - length_normalized_loss: bool = False, - lfmmi_dir: str = '', - special_tokens: dict = None, + self, + vocab_size: int, + encoder: TransformerEncoder, + decoder: TransformerDecoder, + ctc: CTC, + ctc_weight: float = 0.5, + ignore_id: int = IGNORE_ID, + reverse_weight: float = 0.0, + lsm_weight: float = 0.0, + length_normalized_loss: bool = False, + lfmmi_dir: str = '', + special_tokens: dict = None, + device: torch.device = torch.device("cuda"), ): super().__init__(vocab_size, encoder, @@ -51,6 +52,7 @@ def __init__( length_normalized_loss, special_tokens=special_tokens) self.lfmmi_dir = lfmmi_dir + self.device = device if self.lfmmi_dir != '': self.load_lfmmi_resource() @@ -74,7 +76,7 @@ def load_lfmmi_resource(self): arr = line.strip().split() if arr[0] == '': self.sos_eos_id = int(arr[1]) - device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') + device = torch.device(self.device) self.graph_compiler = icefall.mmi_graph_compiler.MmiTrainingGraphCompiler( self.lfmmi_dir, device=device, @@ -124,8 +126,7 @@ def load_hlg_resource_if_necessary(self, hlg, word): except ImportError: print('Error: Failed to import k2') if not hasattr(self, 'hlg'): - device = torch.device( - 'cuda' if torch.cuda.is_available() else 'cpu') + device = torch.device(self.device) self.hlg = k2.Fsa.from_dict(torch.load(hlg, map_location=device)) if not hasattr(self.hlg, "lm_scores"): self.hlg.lm_scores = self.hlg.scores.clone() diff --git a/wenet/transducer/transducer.py b/wenet/transducer/transducer.py index ff730cd500..1348fff672 100644 --- a/wenet/transducer/transducer.py +++ b/wenet/transducer/transducer.py @@ -13,7 +13,7 @@ from wenet.transformer.decoder import BiTransformerDecoder, TransformerDecoder from wenet.transformer.label_smoothing_loss import LabelSmoothingLoss from wenet.utils.common import (IGNORE_ID, add_blank, add_sos_eos, - reverse_pad_list) + reverse_pad_list, TORCH_NPU_AVAILABLE) class Transducer(ASRModel): @@ -98,6 +98,7 @@ def forward( ) -> Dict[str, Optional[torch.Tensor]]: """Frontend + Encoder + predictor + joint + loss """ + self.device = device speech = batch['feats'].to(device) speech_lengths = batch['feats_lengths'].to(device) text = batch['target'].to(device) @@ -514,7 +515,10 @@ def _compute_loss(self, rnnt_text = torch.where(text == self.ignore_id, 0, text) lm = self.simple_lm_proj(predictor_out) am = self.simple_am_proj(encoder_out) - with torch.cuda.amp.autocast(enabled=False): + amp_autocast = torch.cuda.amp.autocast + if "npu" in self.device.__str__() and TORCH_NPU_AVAILABLE: + amp_autocast = torch.npu.amp.autocast + with amp_autocast(enabled=False): simple_loss, (px_grad, py_grad) = k2.rnnt_loss_smoothed( lm=lm.float(), am=am.float(), @@ -544,7 +548,7 @@ def _compute_loss(self, lm_pruned, pre_project=False, ) - with torch.cuda.amp.autocast(enabled=False): + with amp_autocast(enabled=False): pruned_loss = k2.rnnt_loss_pruned( logits=logits.float(), symbols=rnnt_text, diff --git a/wenet/utils/common.py b/wenet/utils/common.py index cdc7610f13..41488d5c76 100644 --- a/wenet/utils/common.py +++ b/wenet/utils/common.py @@ -357,3 +357,21 @@ def tensor_to_scalar(x): if torch.is_tensor(x): return x.item() return x + + +def is_torch_npu_available() -> bool: + ''' + check if torch_npu is available. + torch_npu is a npu adapter of PyTorch + ''' + try: + import torch_npu # noqa + return True + except ImportError: + if not torch.cuda.is_available(): + print("Module \"torch_npu\" not found. \"pip install torch_npu\" \ + if you are using Ascend NPU, otherwise, ignore it") + return False + + +TORCH_NPU_AVAILABLE = is_torch_npu_available() diff --git a/wenet/utils/executor.py b/wenet/utils/executor.py index 999a23e171..e7a61f22cf 100644 --- a/wenet/utils/executor.py +++ b/wenet/utils/executor.py @@ -30,10 +30,13 @@ class Executor: - def __init__(self, global_step: int = 0): + def __init__(self, + global_step: int = 0, + device: torch.device = torch.device("cpu")): self.step = global_step + 1 self.train_step_timer = None self.cv_step_timer = None + self.device = device def train(self, model, optimizer, scheduler, train_data_loader, cv_data_loader, writer, configs, scaler, group_join): @@ -79,7 +82,7 @@ def train(self, model, optimizer, scheduler, train_data_loader, with context(): info_dict = batch_forward(model, batch_dict, scaler, - info_dict) + info_dict, self.device) info_dict = batch_backward(model, scaler, info_dict) info_dict = update_parameter_and_lr(model, optimizer, @@ -135,7 +138,8 @@ def cv(self, model, cv_data_loader, configs): if num_utts == 0: continue - info_dict = batch_forward(model, batch_dict, None, info_dict) + info_dict = batch_forward(model, batch_dict, None, info_dict, + self.device) _dict = info_dict["loss_dict"] num_seen_utts += num_utts diff --git a/wenet/utils/train_utils.py b/wenet/utils/train_utils.py index cdf6da2b3a..538753e784 100644 --- a/wenet/utils/train_utils.py +++ b/wenet/utils/train_utils.py @@ -48,6 +48,7 @@ wenet_fsdp_wrap_policy) from wenet.utils.scheduler import WarmupLR, NoamHoldAnnealing from wenet.utils.ctc_utils import get_blank_id +from wenet.utils.common import TORCH_NPU_AVAILABLE def add_model_args(parser): @@ -146,7 +147,7 @@ def add_ddp_args(parser): parser.add_argument('--ddp.dist_backend', dest='dist_backend', default='nccl', - choices=['nccl', 'gloo'], + choices=['nccl', 'gloo', "hccl"], help='distributed backend') parser.add_argument('--use_amp', action='store_true', @@ -221,7 +222,12 @@ def init_distributed(args): logging.info('training on multiple gpus, this gpu {}'.format(local_rank) + ', rank {}, world_size {}'.format(rank, world_size)) if args.train_engine in ["torch_ddp", "torch_fsdp"]: - torch.cuda.set_device(local_rank) + if "cuda" in args.device: + torch.cuda.set_device(local_rank) + elif "npu" in args.device and TORCH_NPU_AVAILABLE: + torch.npu.set_device(local_rank) + else: + logging.error("not supported device: {}".format(args.device)) dist.init_process_group(args.dist_backend) elif args.train_engine == "deepspeed": deepspeed.init_distributed(dist_backend=args.dist_backend) @@ -370,11 +376,10 @@ def wrap_cuda_model(args, model, configs=None): else: grad_ckpt = False if args.train_engine == "torch_ddp": # native pytorch ddp - assert (torch.cuda.is_available()) - model.cuda() + device = torch.device(args.device) + model.to(device) model = torch.nn.parallel.DistributedDataParallel( model, find_unused_parameters=not grad_ckpt) - device = torch.device("cuda") elif args.train_engine == "deepspeed": # deepspeed # NOTE(xcsong): look in detail how the memory estimator API works: # https://deepspeed.readthedocs.io/en/latest/memory.html#discussion @@ -389,7 +394,7 @@ def wrap_cuda_model(args, model, configs=None): model, num_gpus_per_node=local_world_size, num_nodes=world_size // local_world_size) - device = None # Init device later + device = torch.device(args.device) # Init device later pass # Init DeepSpeed later elif args.train_engine == 'torch_fsdp': assert configs is not None @@ -407,6 +412,12 @@ def wrap_cuda_model(args, model, configs=None): }[args.fsdp_sharding_strategy] wrap_policy = wenet_fsdp_wrap_policy(mode=args.fsdp_sharding_strategy) layer_types = check_gradient_checkpoint(model) + if "cuda" in args.device: + device_id = torch.cuda.current_device() + elif "npu" in args.device and TORCH_NPU_AVAILABLE: + device_id = torch.npu.current_device() + else: + logging.error("not supported device: {}".format(args.device)) model = FSDP( model, auto_wrap_policy=wrap_policy, @@ -423,10 +434,9 @@ def wrap_cuda_model(args, model, configs=None): sync_module_states=args.fsdp_sync_module_states, # init_distributed is called (torch.cuda.set_device), # we should set device_id, see FSDP api - device_id=torch.cuda.current_device(), - ) + device_id=device_id) apply_fsdp_checkpointing(model, layer_types) - device = torch.device("cuda") + device = torch.device(args.device) else: logging.error("not supported engine: {}".format(args.train_engine)) if args.train_engine in ["torch_fsdp", "torch_ddp"]: @@ -542,7 +552,12 @@ def init_summarywriter(args): def init_scaler(args): scaler = None if args.use_amp: - scaler = torch.cuda.amp.GradScaler() + if "cuda" in args.device: + scaler = torch.cuda.amp.GradScaler() + elif "npu" in args.device and TORCH_NPU_AVAILABLE: + scaler = torch.npu.amp.GradScaler() + else: + logging.error("not supported device: {}".format(args.device)) elif args.train_engine == 'torch_fsdp': # why bf16 don't need scaler: # https://discuss.pytorch.org/t/why-bf16-do-not-need-loss-scaling/176596 @@ -612,9 +627,8 @@ def wenet_join(group_join, info_dict): return False -def batch_forward(model, batch, scaler, info_dict): +def batch_forward(model, batch, scaler, info_dict, device): train_engine = info_dict.get('train_engine', "torch_ddp") - device = int(os.environ.get('LOCAL_RANK', 0)) accum_grad = info_dict.get('accum_grad', 1) dtype = info_dict.get("dtype", "fp32") @@ -628,15 +642,18 @@ def batch_forward(model, batch, scaler, info_dict): # autocast context # The more details about amp can be found in # https://pytorch.org/docs/stable/notes/amp_examples.html + amp_autocast = torch.cuda.amp.autocast + if "npu" in device.__str__() and TORCH_NPU_AVAILABLE: + amp_autocast = torch.npu.amp.autocast autocast = { "deepspeed": - torch.cuda.amp.autocast(enabled=dtype is not None, - dtype=dtype, - cache_enabled=False), + amp_autocast(enabled=dtype is not None, + dtype=dtype, + cache_enabled=False), "torch_ddp": - torch.cuda.amp.autocast(enabled=scaler is not None), + amp_autocast(enabled=scaler is not None), "torch_fsdp": - torch.cuda.amp.autocast(enabled=True, dtype=dtype) + amp_autocast(enabled=True, dtype=dtype) if dtype is not None else nullcontext() }[train_engine] with autocast: