更新 'train_for_multidataset_dataparallel.py'

This commit is contained in:
liuzx 2022-08-04 09:19:02 +08:00
parent 189a3aed03
commit dd8b498193
1 changed files with 249 additions and 249 deletions

View File

@ -1,249 +1,249 @@
"""
######################## multi-dataset train lenet example ########################
This example is a multi-dataset training tutorial. If it is a single dataset, please refer to the single dataset
training tutorial train.py. This example cannot be used for a single dataset!
"""
"""
######################## Instructions for using the training environment ########################
1(1)The structure of the dataset uploaded for multi-dataset training in this example
MNISTData.zip
test
t10k-images-idx3-ubyte
t10k-labels-idx1-ubyte
train
train-images-idx3-ubyte
train-labels-idx1-ubyte
checkpoint_lenet-1_1875.zip
checkpoint_lenet-1_1875.ckpt
(2)The dataset structure in the training image for multiple datasets in this example
workroot
MNISTData
| test
| train
checkpoint_lenet-1_1875
checkpoint_lenet-1_1875.ckpt
2Multi-dataset training requires predefined functions
(1)Defines whether the task is a training environment or a debugging environment.
def WorkEnvironment(environment):
if environment == 'train':
workroot = '/home/work/user-job-dir' #The training task uses this parameter to represent the local path of the training image
elif environment == 'debug':
workroot = '/home/ma-user/work' #The debug task uses this parameter to represent the local path of the debug image
print('current work mode:' + environment + ', workroot:' + workroot)
return workroot
(2)Copy multiple datasets from obs to training image
def MultiObsToEnv(multi_data_url, workroot):
multi_data_json = json.loads(multi_data_url) #Parse multi_data_url
for i in range(len(multi_data_json)):
path = workroot + "/" + multi_data_json[i]["dataset_name"]
if not os.path.exists(path):
os.makedirs(path)
try:
mox.file.copy_parallel(multi_data_json[i]["dataset_url"], path)
print("Successfully Download {} to {}".format(multi_data_json[i]["dataset_url"],
path))
except Exception as e:
print('moxing download {} to {} failed: '.format(
multi_data_json[i]["dataset_url"], path) + str(e))
return
***The input and output of the MultiObsToEnv function in this example
Input for multi_data_url
[
{
"dataset_url": "s3://test-opendata/attachment/e/a/eae3a316-42d6-4a43-a484-1fa573eab388e
ae3a316-42d6-4a43-a484-1fa573eab388/", #obs path of the dataset
"dataset_name": "MNIST_Data" #the name of the dataset
},
{
"dataset_url": "s3://test-opendata/attachment/2/c/2c59be66-64ec-41ca-b311-f51a486eabf82c
59be66-64ec-41ca-b311-f51a486eabf8/",
"dataset_name": "checkpoint_lenet-1_1875"
}
]
Purpose of multi_data_url:
The purpose of the MultiObsToEnv function is to copy multiple datasets from obs to the training image
and build the dataset path in the training image.
For example, the path of the MNIST_Data dataset in this example is /home/work/user-job-dir/MNISTData,
The path to the checkpoint_lenet-1_1875 dataset is /home/work/user-job-dir/checkpoint_lenet-1_1875
(3)Copy the output model to obs.
def EnvToObs(obs_train_url, train_dir):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir,
obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir,
obs_train_url) + str(e))
return
34 parameters need to be defined
--data_url is the first dataset you selected on the Qizhi platform
--multi_data_url is the multi-dataset you selected on the Qizhi platform
--data_url,--multi_data_url,--train_url,--device_target,These 4 parameters must be defined first in a multi-dataset task,
otherwise an error will be reported.
There is no need to add these parameters to the running parameters of the Qizhi platform,
because they are predefined in the background, you only need to define them in your code
4How the dataset is used
Multi-datasets use multi_data_url as input, workroot + dataset name + file or folder name in the dataset as the
calling path of the dataset in the training image.
For example, the calling path of the train folder in the MNIST_Data dataset in this example is
workroot + "/MNIST_Data" +"/train"
For details, please refer to the following sample code.
"""
import os
import argparse
import moxing as mox
from config import mnist_cfg as cfg
from dataset_distributed import create_dataset_parallel
from dataset import create_dataset
from lenet import LeNet5
import json
import mindspore.nn as nn
from mindspore import context
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
from mindspore.common import set_seed
from mindspore import load_checkpoint, load_param_into_net
from mindspore.context import ParallelMode
from mindspore.communication.management import init, get_rank, get_group_size
import mindspore.ops as ops
# set device_id and init
device_id = int(os.getenv('ASCEND_DEVICE_ID'))
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
context.set_context(device_id=device_id)
init()
### Defines whether the task is a training environment or a debugging environment ###
def WorkEnvironment(environment):
if environment == 'train':
workroot = '/home/work/user-job-dir'
elif environment == 'debug':
workroot = '/home/ma-user/work'
print('current work mode:' + environment + ', workroot:' + workroot)
return workroot
### Copy multiple datasets from obs to training image ###
def MultiObsToEnv(multi_data_url, workroot):
multi_data_json = json.loads(multi_data_url)
for i in range(len(multi_data_json)):
path = workroot + "/" + multi_data_json[i]["dataset_name"]
if not os.path.exists(path):
os.makedirs(path)
try:
mox.file.copy_parallel(multi_data_json[i]["dataset_url"], path)
print("Successfully Download {} to {}".format(multi_data_json[i]["dataset_url"],
path))
except Exception as e:
print('moxing download {} to {} failed: '.format(
multi_data_json[i]["dataset_url"], path) + str(e))
return
### Copy the output model to obs ###
def EnvToObs(train_dir, obs_train_url):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir,
obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir,
obs_train_url) + str(e))
return
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')
### --data_url,--multi_data_url,--train_url,--device_target,These 4 parameters must be defined first in a multi-dataset,
### otherwise an error will be reported.
### There is no need to add these parameters to the running parameters of the Qizhi platform,
### because they are predefined in the background, you only need to define them in your code.
parser.add_argument('--data_url',
help='path to training/inference dataset folder',
default= WorkEnvironment('train') + '/data/')
parser.add_argument('--multi_data_url',
help='path to multi dataset',
default= WorkEnvironment('train'))
parser.add_argument('--train_url',
help='model folder to save/load',
default= WorkEnvironment('train') + '/model/')
parser.add_argument(
'--device_target',
type=str,
default="Ascend",
choices=['Ascend', 'CPU'],
help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU')
parser.add_argument('--epoch_size',
type=int,
default=5,
help='Training epochs.')
set_seed(114514)
if __name__ == "__main__":
args = parser.parse_args()
# After defining the training environment, first execute the WorkEnv function and the GetMultiDataPath function to
# copy multiple datasets from obs to the training image
environment = 'train'
workroot = WorkEnvironment(environment)
MultiObsToEnv(args.multi_data_url, workroot)
### Define the output path in the training image
train_dir = workroot + '/model'
if not os.path.exists(train_dir):
os.makedirs(train_dir)
### Copy the dataset from obs to the training image ###
context.reset_auto_parallel_context()
context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True)
ds_train = create_dataset_parallel(os.path.join(workroot + "/MNISTData", "train"),
cfg.batch_size)
if ds_train.get_dataset_size() == 0:
raise ValueError(
"Please check dataset size > 0 and batch_size <= dataset size")
network = LeNet5(cfg.num_classes)
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
net_opt = nn.Momentum(network.trainable_params(), cfg.lr, cfg.momentum)
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
### Load the trained model:workroot + "/checkpoint_lenet-1_1875"+"/checkpoint_lenet-1_1875.ckpt"
load_param_into_net(network, load_checkpoint(os.path.join(workroot + "/checkpoint_lenet-1_1875",
"checkpoint_lenet-1_1875.ckpt")))
if args.device_target != "Ascend":
model = Model(network,net_loss,net_opt,metrics={"accuracy": Accuracy()})
else:
model = Model(network, net_loss,net_opt,metrics={"accuracy": Accuracy()},amp_level="O2")
config_ck = CheckpointConfig(save_checkpoint_steps=cfg.save_checkpoint_steps,
keep_checkpoint_max=cfg.keep_checkpoint_max)
#Note that this method saves the model file on each card. You need to specify the save path on each card.
# In the example, get_rank() is added to distinguish different paths.
ckpoint_cb = ModelCheckpoint(prefix="data_parallel",
directory=train_dir + "/" + str(get_rank()) + "/",
config=config_ck)
print("============== Starting Training ==============")
epoch_size = cfg['epoch_size']
if (args.epoch_size):
epoch_size = args.epoch_size
print('epoch_size is: ', epoch_size)
model.train(epoch_size,
ds_train,
callbacks=[time_cb, ckpoint_cb,
LossMonitor()])
###Copy the trained model data from the local running environment back to obs,
###and download it in the training task corresponding to the Qizhi platform
EnvToObs(train_dir, args.train_url)
"""
######################## multi-dataset train lenet example ########################
This example is a multi-dataset training tutorial. If it is a single dataset, please refer to the single dataset
training tutorial train.py. This example cannot be used for a single dataset!
"""
"""
######################## Instructions for using the training environment ########################
1(1)The structure of the dataset uploaded for multi-dataset training in this example
MNISTData.zip
test
t10k-images-idx3-ubyte
t10k-labels-idx1-ubyte
train
train-images-idx3-ubyte
train-labels-idx1-ubyte
checkpoint_lenet-1_1875.zip
checkpoint_lenet-1_1875.ckpt
(2)The dataset structure in the training image for multiple datasets in this example
workroot
MNISTData
| test
| train
checkpoint_lenet-1_1875
checkpoint_lenet-1_1875.ckpt
2Multi-dataset training requires predefined functions
(1)Defines whether the task is a training environment or a debugging environment.
def WorkEnvironment(environment):
if environment == 'train':
workroot = '/home/work/user-job-dir' #The training task uses this parameter to represent the local path of the training image
elif environment == 'debug':
workroot = '/home/ma-user/work' #The debug task uses this parameter to represent the local path of the debug image
print('current work mode:' + environment + ', workroot:' + workroot)
return workroot
(2)Copy multiple datasets from obs to training image
def MultiObsToEnv(multi_data_url, workroot):
multi_data_json = json.loads(multi_data_url) #Parse multi_data_url
for i in range(len(multi_data_json)):
path = workroot + "/" + multi_data_json[i]["dataset_name"]
if not os.path.exists(path):
os.makedirs(path)
try:
mox.file.copy_parallel(multi_data_json[i]["dataset_url"], path)
print("Successfully Download {} to {}".format(multi_data_json[i]["dataset_url"],
path))
except Exception as e:
print('moxing download {} to {} failed: '.format(
multi_data_json[i]["dataset_url"], path) + str(e))
return
***The input and output of the MultiObsToEnv function in this example
Input for multi_data_url
[
{
"dataset_url": "s3://test-opendata/attachment/e/a/eae3a316-42d6-4a43-a484-1fa573eab388e
ae3a316-42d6-4a43-a484-1fa573eab388/", #obs path of the dataset
"dataset_name": "MNIST_Data" #the name of the dataset
},
{
"dataset_url": "s3://test-opendata/attachment/2/c/2c59be66-64ec-41ca-b311-f51a486eabf82c
59be66-64ec-41ca-b311-f51a486eabf8/",
"dataset_name": "checkpoint_lenet-1_1875"
}
]
Purpose of multi_data_url:
The purpose of the MultiObsToEnv function is to copy multiple datasets from obs to the training image
and build the dataset path in the training image.
For example, the path of the MNIST_Data dataset in this example is /home/work/user-job-dir/MNISTData,
The path to the checkpoint_lenet-1_1875 dataset is /home/work/user-job-dir/checkpoint_lenet-1_1875
(3)Copy the output model to obs.
def EnvToObs(train_dir, obs_train_url):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir,
obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir,
obs_train_url) + str(e))
return
34 parameters need to be defined
--data_url is the first dataset you selected on the Qizhi platform
--multi_data_url is the multi-dataset you selected on the Qizhi platform
--data_url,--multi_data_url,--train_url,--device_target,These 4 parameters must be defined first in a multi-dataset task,
otherwise an error will be reported.
There is no need to add these parameters to the running parameters of the Qizhi platform,
because they are predefined in the background, you only need to define them in your code
4How the dataset is used
Multi-datasets use multi_data_url as input, workroot + dataset name + file or folder name in the dataset as the
calling path of the dataset in the training image.
For example, the calling path of the train folder in the MNIST_Data dataset in this example is
workroot + "/MNIST_Data" +"/train"
For details, please refer to the following sample code.
"""
import os
import argparse
import moxing as mox
from config import mnist_cfg as cfg
from dataset_distributed import create_dataset_parallel
from dataset import create_dataset
from lenet import LeNet5
import json
import mindspore.nn as nn
from mindspore import context
from mindspore.train.callback import ModelCheckpoint, CheckpointConfig, LossMonitor, TimeMonitor
from mindspore.train import Model
from mindspore.nn.metrics import Accuracy
from mindspore.common import set_seed
from mindspore import load_checkpoint, load_param_into_net
from mindspore.context import ParallelMode
from mindspore.communication.management import init, get_rank, get_group_size
import mindspore.ops as ops
# set device_id and init
device_id = int(os.getenv('ASCEND_DEVICE_ID'))
context.set_context(mode=context.GRAPH_MODE, device_target="Ascend")
context.set_context(device_id=device_id)
init()
### Defines whether the task is a training environment or a debugging environment ###
def WorkEnvironment(environment):
if environment == 'train':
workroot = '/home/work/user-job-dir'
elif environment == 'debug':
workroot = '/home/ma-user/work'
print('current work mode:' + environment + ', workroot:' + workroot)
return workroot
### Copy multiple datasets from obs to training image ###
def MultiObsToEnv(multi_data_url, workroot):
multi_data_json = json.loads(multi_data_url)
for i in range(len(multi_data_json)):
path = workroot + "/" + multi_data_json[i]["dataset_name"]
if not os.path.exists(path):
os.makedirs(path)
try:
mox.file.copy_parallel(multi_data_json[i]["dataset_url"], path)
print("Successfully Download {} to {}".format(multi_data_json[i]["dataset_url"],
path))
except Exception as e:
print('moxing download {} to {} failed: '.format(
multi_data_json[i]["dataset_url"], path) + str(e))
return
### Copy the output model to obs ###
def EnvToObs(train_dir, obs_train_url):
try:
mox.file.copy_parallel(train_dir, obs_train_url)
print("Successfully Upload {} to {}".format(train_dir,
obs_train_url))
except Exception as e:
print('moxing upload {} to {} failed: '.format(train_dir,
obs_train_url) + str(e))
return
parser = argparse.ArgumentParser(description='MindSpore Lenet Example')
### --data_url,--multi_data_url,--train_url,--device_target,These 4 parameters must be defined first in a multi-dataset,
### otherwise an error will be reported.
### There is no need to add these parameters to the running parameters of the Qizhi platform,
### because they are predefined in the background, you only need to define them in your code.
parser.add_argument('--data_url',
help='path to training/inference dataset folder',
default= WorkEnvironment('train') + '/data/')
parser.add_argument('--multi_data_url',
help='path to multi dataset',
default= WorkEnvironment('train'))
parser.add_argument('--train_url',
help='model folder to save/load',
default= WorkEnvironment('train') + '/model/')
parser.add_argument(
'--device_target',
type=str,
default="Ascend",
choices=['Ascend', 'CPU'],
help='device where the code will be implemented (default: Ascend),if to use the CPU on the Qizhi platform:device_target=CPU')
parser.add_argument('--epoch_size',
type=int,
default=5,
help='Training epochs.')
set_seed(114514)
if __name__ == "__main__":
args = parser.parse_args()
# After defining the training environment, first execute the WorkEnv function and the GetMultiDataPath function to
# copy multiple datasets from obs to the training image
environment = 'train'
workroot = WorkEnvironment(environment)
MultiObsToEnv(args.multi_data_url, workroot)
### Define the output path in the training image
train_dir = workroot + '/model'
if not os.path.exists(train_dir):
os.makedirs(train_dir)
### Copy the dataset from obs to the training image ###
context.reset_auto_parallel_context()
context.set_auto_parallel_context(parallel_mode=ParallelMode.DATA_PARALLEL, gradients_mean=True)
ds_train = create_dataset_parallel(os.path.join(workroot + "/MNISTData", "train"),
cfg.batch_size)
if ds_train.get_dataset_size() == 0:
raise ValueError(
"Please check dataset size > 0 and batch_size <= dataset size")
network = LeNet5(cfg.num_classes)
net_loss = nn.SoftmaxCrossEntropyWithLogits(sparse=True, reduction="mean")
net_opt = nn.Momentum(network.trainable_params(), cfg.lr, cfg.momentum)
time_cb = TimeMonitor(data_size=ds_train.get_dataset_size())
### Load the trained model:workroot + "/checkpoint_lenet-1_1875"+"/checkpoint_lenet-1_1875.ckpt"
load_param_into_net(network, load_checkpoint(os.path.join(workroot + "/checkpoint_lenet-1_1875",
"checkpoint_lenet-1_1875.ckpt")))
if args.device_target != "Ascend":
model = Model(network,net_loss,net_opt,metrics={"accuracy": Accuracy()})
else:
model = Model(network, net_loss,net_opt,metrics={"accuracy": Accuracy()},amp_level="O2")
config_ck = CheckpointConfig(save_checkpoint_steps=cfg.save_checkpoint_steps,
keep_checkpoint_max=cfg.keep_checkpoint_max)
#Note that this method saves the model file on each card. You need to specify the save path on each card.
# In the example, get_rank() is added to distinguish different paths.
ckpoint_cb = ModelCheckpoint(prefix="data_parallel",
directory=train_dir + "/" + str(get_rank()) + "/",
config=config_ck)
print("============== Starting Training ==============")
epoch_size = cfg['epoch_size']
if (args.epoch_size):
epoch_size = args.epoch_size
print('epoch_size is: ', epoch_size)
model.train(epoch_size,
ds_train,
callbacks=[time_cb, ckpoint_cb,
LossMonitor()])
###Copy the trained model data from the local running environment back to obs,
###and download it in the training task corresponding to the Qizhi platform
EnvToObs(train_dir, args.train_url)