增加在调试镜像下的数据并行训练示例
This commit is contained in:
parent
142411e353
commit
7f12cdd80f
|
@ -0,0 +1,14 @@
|
||||||
|
{
|
||||||
|
"version": "1.0",
|
||||||
|
"server_count": "1",
|
||||||
|
"server_list": [
|
||||||
|
{
|
||||||
|
"server_id": "10.155.111.140",
|
||||||
|
"device": [
|
||||||
|
{"device_id": "0","device_ip": "192.1.27.6","rank_id": "0"},
|
||||||
|
{"device_id": "1","device_ip": "192.2.27.6","rank_id": "1"}],
|
||||||
|
"host_nic_ip": "reserve"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"status": "completed"
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
#!/bin/bash
|
||||||
|
set -e
|
||||||
|
EXEC_PATH=$(pwd)
|
||||||
|
|
||||||
|
export RANK_SIZE=2
|
||||||
|
test_dist_2pcs()
|
||||||
|
{
|
||||||
|
export RANK_TABLE_FILE=${EXEC_PATH}/rank_table_2pcs.json
|
||||||
|
export RANK_SIZE=2
|
||||||
|
}
|
||||||
|
|
||||||
|
test_dist_${RANK_SIZE}pcs
|
||||||
|
|
||||||
|
for((i=0;i<2;i++))
|
||||||
|
do
|
||||||
|
rm -rf device$i
|
||||||
|
mkdir device$i
|
||||||
|
cp ./train_dataparallel_debug_env.py ./config.py ./lenet.py ./dataset_distributed.py ./device$i
|
||||||
|
cd ./device$i
|
||||||
|
export DEVICE_ID=$i
|
||||||
|
export RANK_ID=$i
|
||||||
|
echo "start training for device $i"
|
||||||
|
env > env$i.log
|
||||||
|
python ./train_dataparallel_debug_env.py > train_dataparallel_debug_env.log$i 2>&1 &
|
||||||
|
cd ../
|
||||||
|
done
|
||||||
|
echo "The program launch succeed, the log is under device0/train.log0."
|
|
@ -0,0 +1,109 @@
|
||||||
|
#在训练镜像中进行分布式并行训练,不需要配置环境变量,镜像中已默认配置好
|
||||||
|
#在调试镜像中进行分布式并行训练,需要配置环境变量,以本示例为例,可参考run.sh的变量定义
|
||||||
|
|
||||||
|
import os
|
||||||
|
import argparse
|
||||||
|
from dataset_distributed import create_dataset_parallel
|
||||||
|
# import moxing as mox
|
||||||
|
from config import mnist_cfg as cfg
|
||||||
|
# from dataset import create_dataset
|
||||||
|
from lenet import LeNet5
|
||||||
|
import mindspore.nn as nn
|
||||||
|
from mindspore import context
|
||||||
|
from mindspore.common import set_seed
|
||||||
|
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
|
||||||
|
from mindspore.train import Model
|
||||||
|
from mindspore.nn.metrics import Accuracy
|
||||||
|
from mindspore.context import ParallelMode
|
||||||
|
from mindspore.communication.management import init, get_rank, get_group_size
|
||||||
|
import mindspore.ops as ops
|
||||||
|
|
||||||
|
|
||||||
|
# set device_id and init
|
||||||
|
device_id = int(os.getenv('DEVICE_ID'))
|
||||||
|
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
|
||||||
|
# context.set_context(device_id=device_id)
|
||||||
|
init()
|
||||||
|
|
||||||
|
### Defines whether the task is a training environment or a debugging environment ###
|
||||||
|
def WorkEnvironment(environment):
|
||||||
|
if environment == 'train':
|
||||||
|
workroot = '/home/work/user-job-dir'
|
||||||
|
elif environment == 'debug':
|
||||||
|
workroot = '/home/ma-user/work'
|
||||||
|
print('current work mode:' + environment + ', workroot:' + workroot)
|
||||||
|
return workroot
|
||||||
|
|
||||||
|
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')
|
||||||
|
parser.add_argument('--data_url',
|
||||||
|
help='path to training/inference dataset folder',
|
||||||
|
default= WorkEnvironment('debug') + '/data/')
|
||||||
|
|
||||||
|
parser.add_argument('--train_url',
|
||||||
|
help='model folder to save/load',
|
||||||
|
default= WorkEnvironment('debug') + '/model/')
|
||||||
|
|
||||||
|
parser.add_argument(
|
||||||
|
'--device_target',
|
||||||
|
type=str,
|
||||||
|
default="Ascend",
|
||||||
|
choices=['Ascend', 'CPU'],
|
||||||
|
help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU')
|
||||||
|
|
||||||
|
parser.add_argument('--epoch_size',
|
||||||
|
type=int,
|
||||||
|
default=5,
|
||||||
|
help='Training epochs.')
|
||||||
|
set_seed(114514)
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# args = parser.parse_args()
|
||||||
|
### defining the training environment
|
||||||
|
environment = 'debug'
|
||||||
|
workroot = WorkEnvironment(environment)
|
||||||
|
|
||||||
|
###Initialize the data and model directories in the training image###
|
||||||
|
data_dir = workroot + '/data'
|
||||||
|
train_dir = workroot + '/model'
|
||||||
|
if not os.path.exists(data_dir):
|
||||||
|
os.makedirs(data_dir)
|
||||||
|
if not os.path.exists(train_dir):
|
||||||
|
os.makedirs(train_dir)
|
||||||
|
|
||||||
|
### Copy the dataset from obs to the training image ###
|
||||||
|
# ObsToEnv(args.data_url,data_dir)
|
||||||
|
|
||||||
|
context.reset_auto_parallel_context()
|
||||||
|
context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True)
|
||||||
|
ds_train = create_dataset_parallel(os.path.join(data_dir, "train"),
|
||||||
|
cfg.batch_size)
|
||||||
|
if ds_train.get_dataset_size() == 0:
|
||||||
|
raise ValueError(
|
||||||
|
"Please check dataset size > 0 and batch_size <= dataset size")
|
||||||
|
network = LeNet5(cfg.num_classes)
|
||||||
|
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
|
||||||
|
net_opt = nn.Momentum(network.trainable_params(), cfg.lr, cfg.momentum)
|
||||||
|
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
|
||||||
|
|
||||||
|
model = Model(network,
|
||||||
|
net_loss,
|
||||||
|
net_opt,
|
||||||
|
metrics={"accuracy": Accuracy()},
|
||||||
|
amp_level="O2")
|
||||||
|
|
||||||
|
config_ck = CheckpointConfig(
|
||||||
|
save_checkpoint_steps=cfg.save_checkpoint_steps,
|
||||||
|
keep_checkpoint_max=cfg.keep_checkpoint_max)
|
||||||
|
#Note that this method saves the model file on each card. You need to specify the save path on each card.
|
||||||
|
# In the example, get_rank() is added to distinguish different paths.
|
||||||
|
ckpoint_cb = ModelCheckpoint(prefix="data_parallel",
|
||||||
|
directory=train_dir + "/" + str(get_rank()) + "/",
|
||||||
|
config=config_ck)
|
||||||
|
print("============== Starting Training ==============")
|
||||||
|
epoch_size = cfg['epoch_size']
|
||||||
|
print('epoch_size is: ', epoch_size)
|
||||||
|
|
||||||
|
model.train(epoch_size,
|
||||||
|
ds_train,
|
||||||
|
callbacks=[time_cb, ckpoint_cb,
|
||||||
|
LossMonitor()], dataset_sink_mode=True)
|
||||||
|
|
Loading…
Reference in New Issue