From 7f12cdd80ff42b46d7c68fcb6d6668cd6482a23f Mon Sep 17 00:00:00 2001 From: liuzx Date: Thu, 21 Jul 2022 15:05:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=9C=A8=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E9=95=9C=E5=83=8F=E4=B8=8B=E7=9A=84=E6=95=B0=E6=8D=AE=E5=B9=B6?= =?UTF-8?q?=E8=A1=8C=E8=AE=AD=E7=BB=83=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rank_table_2pcs.json | 14 ++++ run.sh | 27 ++++++++ train_dataparallel_debug_env.py | 109 ++++++++++++++++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 rank_table_2pcs.json create mode 100644 run.sh create mode 100644 train_dataparallel_debug_env.py diff --git a/rank_table_2pcs.json b/rank_table_2pcs.json new file mode 100644 index 0000000..a7e65a7 --- /dev/null +++ b/rank_table_2pcs.json @@ -0,0 +1,14 @@ +{ + "version": "1.0", + "server_count": "1", + "server_list": [ + { + "server_id": "10.155.111.140", + "device": [ + {"device_id": "0","device_ip": "192.1.27.6","rank_id": "0"}, + {"device_id": "1","device_ip": "192.2.27.6","rank_id": "1"}], + "host_nic_ip": "reserve" + } + ], + "status": "completed" +} diff --git a/run.sh b/run.sh new file mode 100644 index 0000000..bb3c872 --- /dev/null +++ b/run.sh @@ -0,0 +1,27 @@ +#!/bin/bash +set -e +EXEC_PATH=$(pwd) + +export RANK_SIZE=2 +test_dist_2pcs() +{ + export RANK_TABLE_FILE=${EXEC_PATH}/rank_table_2pcs.json + export RANK_SIZE=2 +} + +test_dist_${RANK_SIZE}pcs + +for((i=0;i<2;i++)) +do + rm -rf device$i + mkdir device$i + cp ./train_dataparallel_debug_env.py ./config.py ./lenet.py ./dataset_distributed.py ./device$i + cd ./device$i + export DEVICE_ID=$i + export RANK_ID=$i + echo "start training for device $i" + env > env$i.log + python ./train_dataparallel_debug_env.py > train_dataparallel_debug_env.log$i 2>&1 & + cd ../ +done +echo "The program launch succeed, the log is under device0/train.log0." \ No newline at end of file diff --git a/train_dataparallel_debug_env.py b/train_dataparallel_debug_env.py new file mode 100644 index 0000000..395585c --- /dev/null +++ b/train_dataparallel_debug_env.py @@ -0,0 +1,109 @@ +#在训练镜像中进行分布式并行训练,不需要配置环境变量,镜像中已默认配置好 +#在调试镜像中进行分布式并行训练,需要配置环境变量,以本示例为例,可参考run.sh的变量定义 + +import os +import argparse +from dataset_distributed import create_dataset_parallel +# import moxing as mox +from config import mnist_cfg as cfg +# from dataset import create_dataset +from lenet import LeNet5 +import mindspore.nn as nn +from mindspore import context +from mindspore.common import set_seed +from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor +from mindspore.train import Model +from mindspore.nn.metrics import Accuracy +from mindspore.context import ParallelMode +from mindspore.communication.management import init, get_rank, get_group_size +import mindspore.ops as ops + + +# set device_id and init +device_id = int(os.getenv('DEVICE_ID')) +context.set_context(mode=context.GRAPH_MODE, device_target="Ascend") +# context.set_context(device_id=device_id) +init() + +### Defines whether the task is a training environment or a debugging environment ### +def WorkEnvironment(environment): + if environment == 'train': + workroot = '/home/work/user-job-dir' + elif environment == 'debug': + workroot = '/home/ma-user/work' + print('current work mode:' + environment + ', workroot:' + workroot) + return workroot + +parser = argparse.ArgumentParser(description='MindSpore Lenet Example') +parser.add_argument('--data_url', + help='path to training/inference dataset folder', + default= WorkEnvironment('debug') + '/data/') + +parser.add_argument('--train_url', + help='model folder to save/load', + default= WorkEnvironment('debug') + '/model/') + +parser.add_argument( + '--device_target', + type=str, + default="Ascend", + choices=['Ascend', 'CPU'], + help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU') + +parser.add_argument('--epoch_size', + type=int, + default=5, + help='Training epochs.') +set_seed(114514) +if __name__ == "__main__": +# args = parser.parse_args() + ### defining the training environment + environment = 'debug' + workroot = WorkEnvironment(environment) + + ###Initialize the data and model directories in the training image### + data_dir = workroot + '/data' + train_dir = workroot + '/model' + if not os.path.exists(data_dir): + os.makedirs(data_dir) + if not os.path.exists(train_dir): + os.makedirs(train_dir) + + ### Copy the dataset from obs to the training image ### +# ObsToEnv(args.data_url,data_dir) + + context.reset_auto_parallel_context() + context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True) + ds_train = create_dataset_parallel(os.path.join(data_dir, "train"), + cfg.batch_size) + if ds_train.get_dataset_size() == 0: + raise ValueError( + "Please check dataset size > 0 and batch_size <= dataset size") + network = LeNet5(cfg.num_classes) + net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean") + net_opt = nn.Momentum(network.trainable_params(), cfg.lr, cfg.momentum) + time_cb = TimeMonitor(data_size=ds_train.get_dataset_size()) + + model = Model(network, + net_loss, + net_opt, + metrics={"accuracy": Accuracy()}, + amp_level="O2") + + config_ck = CheckpointConfig( + save_checkpoint_steps=cfg.save_checkpoint_steps, + keep_checkpoint_max=cfg.keep_checkpoint_max) + #Note that this method saves the model file on each card. You need to specify the save path on each card. + # In the example, get_rank() is added to distinguish different paths. + ckpoint_cb = ModelCheckpoint(prefix="data_parallel", + directory=train_dir + "/" + str(get_rank()) + "/", + config=config_ck) + print("============== Starting Training ==============") + epoch_size = cfg['epoch_size'] + print('epoch_size is: ', epoch_size) + + model.train(epoch_size, + ds_train, + callbacks=[time_cb, ckpoint_cb, + LossMonitor()], dataset_sink_mode=True) +