JCC-CSScheduler/advisor/internal/task/schedule_scheme.go

230 lines
6.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import (
"fmt"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector"
"github.com/inhies/go-bytesize"
)
type MakeScheduleScheme struct {
*advtsk.MakeAdjustScheme
}
func NewMakeScheduleScheme(info *advtsk.MakeAdjustScheme) *MakeScheduleScheme {
return &MakeScheduleScheme{
MakeAdjustScheme: info,
}
}
func (t *MakeScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[MakeScheduleScheme]("Task")
log.Debugf("begin")
defer log.Debugf("end")
scheme, err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus(err.Error(), jobmod.JobScheduleScheme{}))
} else {
// 将调度方案上报给manager
ctx.reporter.Report(task.ID(), advtsk.NewMakeAdjustSchemeStatus("", scheme))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *MakeScheduleScheme) do(taskID string, ctx TaskContext) (jobmod.JobScheduleScheme, error) {
var scheme jobmod.JobScheduleScheme
isAvailable, err := t.CheckResourceAvailability()
if err != nil {
return scheme, err
}
var defaultSchedule scheduler.DefaultSchedule
if isAvailable {
// 确认code、dataset、image是否已经调度到该中心并生成调度方案
resp, err := defaultSchedule.ComputeAllSlwNodeScore(t.Job.Files)
score, ok := resp[t.Job.TargetSlwNodeID]
if ok {
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: t.Job.TargetSlwNodeID,
Code: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeStgAction(score.CodeScore.Isloaded)},
Dataset: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeStgAction(score.DatasetScore.Isloaded)},
Image: jobmod.FileScheduleScheme{Action: defaultSchedule.SchemeImageAction(score.DatasetScore.Isloaded)},
}
} else {
scheme = jobmod.JobScheduleScheme{
TargetSlwNodeID: t.Job.TargetSlwNodeID,
Code: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Dataset: jobmod.FileScheduleScheme{Action: jobmod.ActionLoad},
Image: jobmod.FileScheduleScheme{Action: jobmod.ActionImportImage},
}
}
if err != nil {
return scheme, err
}
// 重新执行预调度方案,寻找最优节点
} else {
s, err := defaultSchedule.Schedule(t.Job)
scheme = *s
if err != nil {
return scheme, err
}
}
return scheme, nil
}
// 检查预调度节点资源是否足够
func (t *MakeScheduleScheme) CheckResourceAvailability() (bool, error) {
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
return false, fmt.Errorf("new collector client: %w", err)
}
defer colCli.Close()
neededCPU := t.Job.Info.Resources.CPU
if neededCPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeCPU,
))
if err != nil {
return false, err
}
availCPU := resp.Data.(*uopsdk.CPUResourceData).Available
if float64(availCPU.Value) < 1.5*neededCPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient CPU resources, want: %f, available: %d%s", 1.5*neededCPU, availCPU.Value, availCPU.Unit)
return false, nil
}
}
neededNPU := t.Job.Info.Resources.NPU
if neededNPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeNPU,
))
if err != nil {
return false, err
}
availNPU := resp.Data.(*uopsdk.NPUResourceData).Available
if float64(availNPU.Value) < 1.5*neededNPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient NPU resources, want: %f, available: %d%s", 1.5*neededNPU, availNPU.Value, availNPU.Unit)
return false, nil
}
}
neededGPU := t.Job.Info.Resources.GPU
if neededGPU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeGPU,
))
if err != nil {
return false, err
}
availGPU := resp.Data.(*uopsdk.GPUResourceData).Available
if float64(availGPU.Value) < 1.5*neededGPU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient GPU resources, want: %f, available: %d%s", 1.5*neededGPU, availGPU.Value, availGPU.Unit)
return false, nil
}
}
neededMLU := t.Job.Info.Resources.MLU
if neededMLU > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeMLU,
))
if err != nil {
return false, err
}
availMLU := resp.Data.(*uopsdk.MLUResourceData).Available
if float64(availMLU.Value) < 1.5*neededMLU {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient MLU resources, want: %f, available: %d%s", 1.5*neededMLU, availMLU.Value, availMLU.Unit)
return false, nil
}
}
neededStorage := t.Job.Info.Resources.Storage
if neededStorage > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeStorage,
))
if err != nil {
return false, err
}
availStorage := resp.Data.(*uopsdk.StorageResourceData).Available
bytesStorage, err := bytesize.Parse(fmt.Sprintf("%f%s", availStorage.Value, availStorage.Unit))
if err != nil {
return false, err
}
if int64(bytesStorage) < int64(1.5*float64(neededStorage)) {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient storage resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededStorage)), availStorage.Value, availStorage.Unit)
return false, nil
}
}
neededMemory := t.Job.Info.Resources.Memory
if neededMemory > 0 {
resp, err := colCli.GetOneResourceData(collector.NewGetOneResourceData(
t.Job.TargetSlwNodeID,
uopsdk.ResourceTypeMemory,
))
if err != nil {
return false, err
}
availMemory := resp.Data.(*uopsdk.MemoryResourceData).Available
bytesMemory, err := bytesize.Parse(fmt.Sprintf("%f%s", availMemory.Value, availMemory.Unit))
if err != nil {
return false, err
}
if int64(bytesMemory) < int64(1.5*float64(neededMemory)) {
logger.WithField("JobID", t.Job.JobID).
Infof("insufficient memory resources, want: %s, available: %f%s", bytesize.New(1.5*float64(neededMemory)), availMemory.Value, availMemory.Unit)
return false, nil
}
}
return true, nil
}
func init() {
Register(NewMakeScheduleScheme)
}