增加多卡数据并行训练示例

This commit is contained in:
liuzx 2022-07-14 15:14:44 +08:00
parent a1ba7f8122
commit 1affe8a546
2 changed files with 243 additions and 16 deletions

View File

@ -1,20 +1,6 @@
# Copyright 2020 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
Produce the dataset
"""
Produce the dataset
与单机不同的是在数据集接口需要传入num_shards和shard_id参数分别对应卡的数量和逻辑序号建议通过HCCL接口获取
get_rank获取当前设备在集群中的ID
get_group_size获取集群数量

241
train_autoparallel.py Normal file
View File

@ -0,0 +1,241 @@
"""
######################## single-dataset train lenet example ########################
This example is a single-dataset training tutorial. If it is a multi-dataset, please refer to the multi-dataset training
tutorial train_for_multidataset.py. This example cannot be used for multi-datasets!
######################## Instructions for using the training environment ########################
The image of the debugging environment and the image of the training environment are two different images,
and the working local directories are different. In the training task, you need to pay attention to the following points.
1(1)The structure of the dataset uploaded for single dataset training in this example
MNISTData.zip
test
t10k-images-idx3-ubyte
t10k-labels-idx1-ubyte
train
train-images-idx3-ubyte
train-labels-idx1-ubyte
(2)The dataset structure of the single dataset in the training image in this example
workroot
data
| test
| train
2Single dataset training requires predefined functions
(1)Defines whether the task is a training environment or a debugging environment.
def WorkEnvironment(environment):
if environment == 'train':
workroot = '/home/work/user-job-dir' #The training task uses this parameter to represent the local path of the training image
elif environment == 'debug':
workroot = '/home/ma-user/work' #The debug task uses this parameter to represent the local path of the debug image
print('current work mode:' + environment + ', workroot:' + workroot)
return workroot
(2)Copy single dataset from obs to training image.
def ObsToEnv(obs_data_url, data_dir):
try:
mox.file.copy_parallel(obs_data_url, data_dir)
print("Successfully Download {} to {}".format(obs_data_url, data_dir))
except Exception as e:
print('moxing download {} to {} failed: '.format(obs_data_url, data_dir) + str(e))
return
(3)Copy the output model to obs.
def EnvToObs(train_dir, obs_train_url):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir,obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir,obs_train_url) + str(e))
return
33 parameters need to be defined
--data_url is the dataset you selected on the Qizhi platform
--data_url,--train_url,--device_target,These 3 parameters must be defined first in a single dataset task,
otherwise an error will be reported.
There is no need to add these parameters to the running parameters of the Qizhi platform,
because they are predefined in the background, you only need to define them in your code.
4How the dataset is used
A single dataset uses data_url as the input, and data_dir (ie: workroot + '/data') as the calling method
of the dataset in the image.
For details, please refer to the following sample code.
"""
import os
import argparse
from dataset_8 import create_dataset_parallel
import moxing as mox
from config import mnist_cfg as cfg
from dataset import create_dataset
from lenet import LeNet5
import mindspore.nn as nn
from mindspore import context
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
from mindspore.context import ParallelMode
from mindspore.communication.management import init, get_rank, get_group_size
import mindspore.ops as ops
from mindspore import Tensor
from mindspore import dtype as mstype
class SoftmaxCrossEntropyExpand(nn.Cell):
def __init__(self, sparse=False):
super(SoftmaxCrossEntropyExpand, self).__init__()
self.exp = ops.Exp()
self.sum = ops.ReduceSum(keep_dims=True)
self.onehot = ops.OneHot()
self.on_value = Tensor(1.0, mstype.float32)
self.off_value = Tensor(0.0, mstype.float32)
self.div = ops.RealDiv()
self.log = ops.Log()
self.sum_cross_entropy = ops.ReduceSum(keep_dims=False)
self.mul = ops.Mul()
self.mul2 = ops.Mul()
self.mean = ops.ReduceMean(keep_dims=False)
self.sparse = sparse
self.max = ops.ReduceMax(keep_dims=True)
self.sub = ops.Sub()
def construct(self, logit, label):
logit_max = self.max(logit, -1)
exp = self.exp(self.sub(logit, logit_max))
exp_sum = self.sum(exp, -1)
softmax_result = self.div(exp, exp_sum)
if self.sparse:
label = self.onehot(label, ops.shape(logit)[1], self.on_value, self.off_value)
softmax_result_log = self.log(softmax_result)
loss = self.sum_cross_entropy((self.mul(softmax_result_log, label)), -1)
loss = self.mul2(ops.scalar_to_array(-1.0), loss)
loss = self.mean(loss, -1)
return loss
device_id = int(os.getenv('DEVICE_ID'))
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
context.set_context(device_id=device_id) # set device_id
init()
### Defines whether the task is a training environment or a debugging environment ###
def WorkEnvironment(environment):
if environment == 'train':
workroot = '/home/work/user-job-dir'
elif environment == 'debug':
workroot = '/home/work'
print('current work mode:' + environment + ', workroot:' + workroot)
return workroot
### Copy single dataset from obs to training image###
def ObsToEnv(obs_data_url, data_dir):
try:
mox.file.copy_parallel(obs_data_url, data_dir)
print("Successfully Download {} to {}".format(obs_data_url, data_dir))
except Exception as e:
print('moxing download {} to {} failed: '.format(obs_data_url, data_dir) + str(e))
return
### Copy the output model to obs###
def EnvToObs(train_dir, obs_train_url):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir,obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir,obs_train_url) + str(e))
return
### --data_url,--train_url,--device_target,These 3 parameters must be defined first in a single dataset,
### otherwise an error will be reported.
###There is no need to add these parameters to the running parameters of the Qizhi platform,
###because they are predefined in the background, you only need to define them in your code.
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')
parser.add_argument('--data_url',
help='path to training/inference dataset folder',
default= WorkEnvironment('train') + '/data/')
parser.add_argument('--train_url',
help='model folder to save/load',
default= WorkEnvironment('train') + '/model/')
parser.add_argument(
'--device_target',
type=str,
default="Ascend",
choices=['Ascend', 'CPU'],
help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU')
parser.add_argument('--epoch_size',
type=int,
default=5,
help='Training epochs.')
if __name__ == "__main__":
args = parser.parse_args()
### defining the training environment
environment = 'train'
workroot = WorkEnvironment(environment)
###Initialize the data and model directories in the training image###
data_dir = workroot + '/data'
train_dir = workroot + '/model'
if not os.path.exists(data_dir):
os.makedirs(data_dir)
if not os.path.exists(train_dir):
os.makedirs(train_dir)
### Copy the dataset from obs to the training image ###
ObsToEnv(args.data_url,data_dir)
###Specifies the device CPU or Ascend NPU used for training###
# context.set_context(mode=context.GRAPH_MODE,
# device_target=args.device_target)
ds_train = create_dataset(os.path.join(data_dir, "train"),
cfg.batch_size)
# ds_train = create_dataset_parallel(os.path.join(data_dir, "train"),
# cfg.batch_size)
if ds_train.get_dataset_size() == 0:
raise ValueError(
"Please check dataset size > 0 and batch_size <= dataset size")
context.reset_auto_parallel_context()
context.set_auto_parallel_context(parallel_mode=ParallelMode.AUTO_PARALLEL, gradients_mean=True,device_num=8)
network = LeNet5(cfg.num_classes)
# net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
net_loss = SoftmaxCrossEntropyExpand(sparse=True)
net_opt = nn.Momentum(network.trainable_params(), cfg.lr, cfg.momentum)
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
if args.device_target != "Ascend":
model = Model(network,
net_loss,
net_opt,
metrics={"accuracy": Accuracy()})
else:
model = Model(network,
net_loss,
net_opt,
metrics={"accuracy": Accuracy()},
amp_level="O2")
config_ck = CheckpointConfig(
save_checkpoint_steps=cfg.save_checkpoint_steps,
keep_checkpoint_max=cfg.keep_checkpoint_max)
ckpt_save_dir = os.path.join(train_dir, "ckpt_" + str(0))
ckpoint_cb = ModelCheckpoint(prefix="auto_parallel",
directory=ckpt_save_dir,
config=config_ck)
print("============== Starting Training ==============")
epoch_size = cfg['epoch_size']
if (args.epoch_size):
epoch_size = args.epoch_size
print('epoch_size is: ', epoch_size)
model.train(epoch_size,
ds_train,
callbacks=[time_cb, ckpoint_cb,
LossMonitor()], dataset_sink_mode=True)
###Copy the trained model data from the local running environment back to obs,
###and download it in the training task corresponding to the Qizhi platform
EnvToObs(ckpt_save_dir, args.train_url)