ModelLink/tests/pipeline/extract_log.py

156 lines
5.5 KiB
Python

from abc import ABC, abstractmethod
import json
import argparse
class BaseLogExtractor(ABC):
def __init__(self, input_path:str, out_path_prefix:str):
self.input_path = input_path
self.out_path_prefix = out_path_prefix
self.losses = []
self.memories = []
self.parameters = {"global_batch_size": 0,
"seq_length": 0,
"world_size": 0}
@abstractmethod
def _extract_parameter(self, line: str) -> None:
pass
@abstractmethod
def _extract_iterline(self, line: str) -> None:
pass
@abstractmethod
def _extract_memory(self, line) -> None:
pass
def _extract(self) -> None:
with open(self.input_path, encoding='utf-8') as f:
for line in f:
self._extract_parameter(line)
self._extract_iterline(line)
self._extract_memory(line)
def _save(self) -> None:
loss_path = f"{self.out_path_prefix}_loss.tsv"
with open(loss_path, 'w') as f:
f.write("step\tloss\titer_time\n")
for step, loss, iter_time in self.losses:
f.write(f"{step}\t{loss}\t{iter_time}\n")
memory_path = f"{self.out_path_prefix}_memory.tsv"
with open(memory_path, 'w') as f:
f.write("rank_id\tmemory\n")
for rank_id, memory in sorted(self.memories):
f.write(f"{rank_id}\t{memory}\n")
parameters_path = f"{self.out_path_prefix}_parameters.json"
with open(parameters_path, 'w') as f:
json.dump(self.parameters, f, indent=4)
def __call__(self):
self._extract()
self._save()
class MegatronLogExtractor(BaseLogExtractor):
def _extract_parameter(self, line: str) -> None:
for param in self.parameters.keys():
if line.startswith(f" {param}"):
blank_pos = line.rfind(' ')
self.parameters[param] = int(line[blank_pos:])
def _extract_iterline(self, line: str):
if not line.startswith(" iteration"):
return
backslash_pos = line.find('/')
blank_pos = line.rfind(' ', 0, backslash_pos)
step = line[blank_pos:backslash_pos]
ms_pos = line.find('(ms):')
pipe_pos = line.find('|', ms_pos)
iter_time = line[ms_pos+6: pipe_pos-1]
loss_pos = line.find('lm loss:')
if loss_pos > 0:
bar_pos = line.find('|', loss_pos)
loss = line[loss_pos+9:bar_pos-1]
else:
loss = 'inf'
self.losses.append((int(step), float(loss), float(iter_time)))
def _extract_memory(self, line) -> None:
if not line.startswith("[Rank"):
return
start = 0
while start >= 0:
rsb_pos = line.find(']', start)
rankid = line[start+6:rsb_pos]
mem_pos = line.find('allocated:', rsb_pos)
pipe_pos = line.find('|', mem_pos)
memory = line[mem_pos+11:pipe_pos-1]
self.memories.append((int(rankid), float(memory)))
start = line.find("[Rank", pipe_pos)
class DeepSpeedLogExtractor(BaseLogExtractor):
def __init__(self, input_path: str, out_path_prefix: str):
super().__init__(input_path, out_path_prefix)
self.parameters = {
"train_batch_size": 0,
"seq-length": 0
}
def _extract_parameter(self, line: str) -> None:
for param in self.parameters.keys():
param_pos = line.find(f" \"{param}\":")
if f" \"{param}\":" in line:
colon_pos = line.find(':', param_pos)
comma_pos = line.find(',', colon_pos)
self.parameters[param] = int(line[colon_pos + 1 : comma_pos])
def _extract_iterline(self, line: str):
if not line.startswith("steps: "):
return
step_pos = 0
loss_pos = line.find(' loss:')
iter_time_pos = line.find(' iter time (s):')
iter_time_end = line.find(' samples/sec:')
step = line[step_pos + 7 : loss_pos]
iter_time = line[iter_time_pos + 15 : iter_time_end]
if loss_pos > 0:
loss = line[loss_pos + 6 : iter_time_pos]
else:
loss = 'inf'
self.losses.append((int(step), float(loss), float(iter_time)))
def _extract_memory(self, line) -> None:
if not line.startswith("after 1 iterations memory (MB)"):
return
mem_pos = line.find('allocated: ')
pipe_pos = line.find('|', mem_pos)
memory = line[mem_pos + 11 : pipe_pos - 1]
self.memories.append((0, float(memory)))
def main(args):
if args.frame_kind.lower() == 'megatron':
MegatronLogExtractor(args.input_path, args.output_path_prefix)()
if args.frame_kind.lower() == 'deepspeed':
DeepSpeedLogExtractor(args.input_path, args.output_path_prefix)()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='extract loss, performance and memory data from training log')
parser.add_argument('frame_kind', help='The training frame: Megatron, Deepspeed or Torch')
parser.add_argument('input_path', help='The training log path')
parser.add_argument('output_path_prefix', help='The output path prefix')
args = parser.parse_args()
main(args)