clusterdata/cluster-trace-gpu-v2020/analysis/utils.py

261 lines
12 KiB
Python

import os
import datetime
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
########### Data Constants ###########
DATA_DIR = '../data/'
if not os.access('/tmp/figures', os.F_OK):
os.mkdir('/tmp/figures')
if not os.access('/tmp/figures', os.W_OK):
print('Cannot write to /tmp/figures, please fix it.')
exit()
else:
print('figures saved to /tmp/figures')
########### Prepare Functions ###########
def get_df(file, header=None):
df = pd.read_csv(file, header=None)
# df.columns = DF_HEADER.get(key, df.columns)
df.columns = pd.read_csv("{}.header".format(file.split('.csv')[0])).columns if header is None else header
return df
def load_all_df():
dfj = get_df(DATA_DIR + 'pai_job_table.csv')
dft = get_df(DATA_DIR + 'pai_task_table.csv')
dfi = get_df(DATA_DIR + 'pai_instance_table.csv')
dfs = get_df(DATA_DIR + 'pai_sensor_table.csv')
dfg = get_df(DATA_DIR + 'pai_group_tag_table.csv')
dfp = get_df(DATA_DIR + 'pai_machine_spec.csv')
dfm = get_df(DATA_DIR + 'pai_machine_metric.csv')
return dfj,dft,dfi,dfs,dfg,dfp,dfm
def get_dfiw(dfi):
dfiw = dfi.sort_values(['status','start_time','end_time'])
dfiw.drop_duplicates(subset=['worker_name'], keep='last', inplace=True)
dfiw.dropna(subset=['worker_name'], inplace=True)
dfiw['runtime'] = dfiw[(dfiw.start_time>0)&(dfiw.end_time>0)]['end_time'] \
- dfiw[(dfiw.start_time>0)&(dfiw.end_time>0)]['start_time']
dfiw.loc[dfiw.start_time==0, 'start_time'] = np.nan
dfiw.loc[dfiw.start_time==0, 'end_time'] = np.nan
return dfiw
def get_dfw(dfi, dft, dfg):
dfw = get_dfiw(dfi)
dfw['start_date']=dfw.start_time.apply(pd.Timestamp, unit='s', tz='Asia/Shanghai')
print('dfi + dft ...')
dfw = dfw.merge(dft, on=['job_name','task_name'], how='left', suffixes=['', '_t'])
print('dfi + dft + dfg ...')
dfw = dfw.merge(dfg, on='inst_id', how='left') # reserve NaN ones by how='left'
dfw.loc[dfw.group.isnull(),'group'] = dfw.loc[dfw.group.isnull(), 'user'] # fill group==NaN ones with user
return dfw
def get_dfia(dfi):
dfi_s = dfi[dfi.start_time > 0][['job_name','task_name','start_time']].groupby(['job_name','task_name']).min() # start_time
dfi_e = dfi[dfi.end_time > 0][['job_name','task_name','end_time']].groupby(['job_name','task_name']).max() # end_time
dfi_m = dfi[(dfi.start_time > 0) & (dfi.end_time > 0)][['job_name','task_name','end_time','start_time']]
dfi_m['runtime'] = dfi_m.end_time-dfi_m.start_time
dfi_m = dfi_m.groupby(['job_name','task_name']).mean()[['runtime']].reset_index() # runtime
dfi_u = dfi[['job_name','task_name','status']].drop_duplicates().groupby(['job_name','task_name']).max() # status
dfia = dfi_u
for df in [dfi_s, dfi_e, dfi_m]:
dfia = dfia.merge(df, on=['job_name','task_name'], how='left')
return dfia
def get_dfa(dft, dfj, dfi, dfg):
print('dft + dfj ...')
dfa = dft.merge(dfj, on=['job_name'], suffixes = ['','_j'])
dfa.loc[dfa.start_time==0, 'start_time'] = np.nan
dfa.loc[dfa.start_time==0, 'end_time'] = np.nan
dfa['runtime'] = dfa.end_time - dfa.start_time
print('dft + dfj + dfi ...')
dfia = get_dfia(dfi)
dfa = dfa.merge(dfia, on=['job_name','task_name'], suffixes=['','_i'])
dfa['duration_min'] = dfa.runtime_i / 60 # duration of instances
dfa['wait_time'] = dfa.start_time_i - dfa.start_time # task wait time
dfa['start_date']=dfa.start_time.apply(pd.Timestamp, unit='s', tz='Asia/Shanghai') # task start time
# dfa = dfa[dfa.status=='Terminated']
print('dft + dfj + dfi + dfg ...')
dfa = dfa.merge(dfg[[x for x in dfg.columns if x != 'user']], on='inst_id', how='left') # reserve NaN ones by how='left'
dfa.loc[dfa.group.isnull(),'group'] = dfa.loc[dfa.group.isnull(), 'user'] # fill group==NaN ones with user
return dfa
def get_dfwitm(dfwit, csv_file='intermediate_data/machine_metric_shennong_machine_all.csv'):
res_df = pd.read_csv(csv_file, index_col=0)
dfwitm = dfwit.merge(res_df.loc[:, ~res_df.columns.isin(['start_time','end_time','machine'])], on='worker_name', how='left')
return dfwitm
########### Plot Functions ###########
linestyle_list = [
('solid', 'solid'), # Same as (0, ()) or '-'
('dotted', 'dotted'), # Same as (0, (1, 1)) or '.'
('dashed', 'dashed'), # Same as '--'
('dashdot', 'dashdot'), # Same as '-.'
('densely dashdotdotted', (0, (3, 1, 1, 1, 1, 1))),
('densely dashdotted', (0, (3, 1, 1, 1))),
('densely dotted', (0, (1, 1))),
('densely dashed', (0, (5, 1))),
('dashdotdotted', (0, (3, 5, 1, 5, 1, 5))),
('loosely dashed', (0, (5, 10))),
('loosely dashdotted', (0, (3, 10, 1, 10))),
('loosely dashdotdotted', (0, (3, 10, 1, 10, 1, 10))),
('loosely dotted', (0, (1, 10))),
('dashed', (0, (5, 5))),
('dashdotted', (0, (3, 5, 1, 5))),
('dotted', (0, (1, 1))),
]
def get_cdf(data, inverse=False):
sorted_data = sorted(data)
p = 100. * np.arange(len(sorted_data))/(len(sorted_data)-1)
p = 100. - p if inverse else p # CCDF
return sorted_data, p
def plot_data_cdf(data, inverse=False, datalabel=None, xlabel=None, title=None, xlog=False, xlim=None, ylog=False, xticks=None, figsize=(4,3), dpi=120, savefig=None, ylabel=None):
plt.figure(figsize=figsize, dpi=dpi)
if type(data) == pd.DataFrame:
data.dropna(inplace=True)
x, y = get_cdf(data, inverse)
plt.plot(x, y, label=datalabel, color='green', linestyle='-')
if datalabel is not None: plt.legend(loc='lower right')
if xlog: plt.xscale('log')
if ylog: plt.yscale('log')
if xlim is not None: plt.xlim(xlim)
plt.ylim(0, 100)
if xlabel is not None: plt.xlabel(xlabel)
plt.ylabel(ylabel) if ylabel is not None else plt.ylabel('CCDF') if inverse is True else plt.ylabel('CDF')
if title is not None: plt.title(title)
if xticks is not None: plt.xticks(xticks)
plt.grid(alpha=.3, linestyle='--')
if savefig is not None:
plt.savefig('/tmp/figures/{}.pdf'.format(savefig),bbox_inches='tight')
else:
plt.show()
def plot_data_cdfs(data, datalabel=None, inverse=False, xlabel=None, title=None, xlog=False, ylog=False, xticks=None, figsize=(4,3), dpi=120, xlim=None, ylim=None, ylabel=None, yticks=None, savefig=None, loc='best', fontsize=None):
plt.figure(figsize=figsize, dpi=dpi)
for i, d in enumerate(data):
if type(data) == pd.DataFrame:
d.dropna(inplace=True)
x, y = get_cdf(d, inverse)
label = datalabel[i] if datalabel is not None else None
plt.plot(x, y, label=label, linestyle=linestyle_list[i % len(linestyle_list)][1])
if datalabel is not None: plt.legend(loc=loc, fontsize=fontsize)
if xlog: plt.xscale('log')
if ylog: plt.yscale('log')
plt.ylim(0, 100) if ylim is None else plt.ylim(ylim)
if xlim is not None: plt.xlim(xlim)
if xlabel is not None: plt.xlabel(xlabel)
if ylabel is None:
plt.ylabel('CCDF') if inverse is True else plt.ylabel('CDF')
else:
plt.ylabel(ylabel)
if title is not None: plt.title(title)
if xticks is not None: plt.xticks(xticks)
if yticks is not None: plt.yticks(yticks)
plt.grid(alpha=.3, linestyle='--')
if savefig is not None:
plt.savefig('/tmp/figures/{}.pdf'.format(savefig),bbox_inches='tight')
else:
plt.show()
def draw_bar_plot(odf, col, figsize=(4,4), dpi=120, portion=False, title=None, limit=30):
dfout=odf.reset_index().groupby(col).count()[['index']].sort_values('index', ascending=False).head(limit)
dfout['portion'] = 100 * dfout['index'] / dfout['index'].sum()
plt.figure(figsize=figsize, dpi=dpi)
if portion:
plt.barh(y=dfout.index, width=dfout['portion'])
plt.xlabel('Percentage (total: %.2f)'%(dfout['index'].sum()))
else:
plt.barh(y=dfout.index, width=dfout['index'])
plt.grid(alpha=.3, linestyle='--')
return dfout
########### Process Functions ###########
def get_inst_task_num_ratio(dfa, inst_num_list=[2, 8, 20, 64, 100, 256, 512]):
total_num_task, total_num_inst = len(dfa), sum(dfa['inst_num'])
data_df = []
for i in inst_num_list:
temp_df = dfa[dfa['inst_num'] >= i]
task_num_ratio = len(temp_df) / total_num_task
inst_num_ratio = sum(temp_df['inst_num']) / total_num_inst
data_df.append([task_num_ratio, inst_num_ratio])
out_df = pd.DataFrame(data_df, columns=['num_task_ratio','num_inst_ratio'])
out_df = out_df.T.rename(columns=dict(zip(range(len(inst_num_list)), inst_num_list)))
return out_df
def add_hour_date(df):
if 'start_date' not in df:
if 'start_time_t' in df:
target_col = 'start_time_t'
elif 'start_time' in df:
target_col = 'start_time'
else:
print('start_time, start_time_t, dayofyear unfound in df')
return None
df['start_date'] = df[target_col].apply(lambda x: pd.Timestamp(x, unit='s', tz='Asia/Shanghai'))
if 'date' not in df:
df['date'] = df['start_date'].apply(lambda x: x.date())
if 'hour' not in df:
df['hour'] = df['start_date'].apply(lambda x: x.hour)
return df
def get_hourly_task_request(df): # df = dftjkix
sum_df_list = []
df = add_hour_date(df.copy())
# for day in sorted(df.dayofyear.unique()):
for date in sorted(df.date.unique()):
# tempdf = df[df.dayofyear==day]
tempdf = df[df.date==date]
res_df = tempdf.groupby('hour').count()[['job_name']]
res_df.rename(columns={'job_name':date}, inplace=True)
sum_df_list.append(res_df.T)
out_df = pd.DataFrame().append(sum_df_list)
return out_df.dropna() # if a day contains hours of NaN, it is not a typical day
def get_hourly_task_resource_request(df, metrics='cpu'): # df = dftjkix
sum_df_list = []
df = add_hour_date(df)
if metrics == 'cpu':
df['plan_resource'] = df.plan_cpu.apply(lambda x: x/100)
elif metrics == 'gpu':
df['plan_resource'] = df.plan_gpu.apply(lambda x: x/100)
elif metrics == 'mem':
df['plan_resource'] = df.plan_mem.apply(lambda x: x/1000)
else:
exit()
# for day in sorted(df.dayofyear.unique()):
for date in sorted(df.date.unique()):
# tempdf = df[df.dayofyear==day]
tempdf = df[df.date==date]
res_df = tempdf.groupby('hour').sum()[['plan_resource']]
res_df.rename(columns={'job_name':date}, inplace=True)
sum_df_list.append(res_df.T)
out_df = pd.DataFrame().append(sum_df_list)
return out_df.dropna() # if a day contains hours of NaN, it is not a typical day
def plan_minus_usg_over_cap_task(dfas):
dfas['plan_gpu_minus_usage_over_capacity'] = (dfas['plan_gpu'] - dfas['gpu_wrk_util']) / (100 * dfas['cap_gpu'])
dfas['plan_cpu_minus_usage_over_capacity'] = (dfas['plan_cpu'] - dfas['cpu_usage']) / (100 * dfas['cap_cpu'] )
dfas['plan_mem_minus_usage_over_capacity'] = (dfas['plan_mem'] - dfas['avg_mem']) / dfas['cap_mem']
dfas_task = dfas.groupby(['job_name','task_name'])[['plan_gpu_minus_usage_over_capacity','plan_cpu_minus_usage_over_capacity','plan_mem_minus_usage_over_capacity']].mean()
pgu_datas, pgu_label, ugp_datas, ugp_label = [], [], [], []
for device in ['cpu','gpu','mem']:
apu = dfas_task[~dfas_task['plan_{}_minus_usage_over_capacity'.format(device)].isnull()]
pgu = dfas_task[dfas_task['plan_{}_minus_usage_over_capacity'.format(device)] > 0]
ugp = dfas_task[dfas_task['plan_{}_minus_usage_over_capacity'.format(device)] < 0]
print("{}: plan > usage: {:.2f}%, plan < usage: {:.2f}%".format(
device, 100 * len(pgu) / len(apu), 100 * len(ugp) / len(apu) ))
pgu_label.append("{} {:.2f}%".format(device, 100 * len(pgu) / len(apu)))
pgu_datas.append(pgu['plan_{}_minus_usage_over_capacity'.format(device)])
ugp_label.append("{} {:.2f}%".format(device, 100 * len(ugp) / len(apu)))
ugp_datas.append(-ugp['plan_{}_minus_usage_over_capacity'.format(device)])
return pgu_datas, ugp_datas, pgu_label, ugp_label