From 719e84b00c11052efeb587e8ec5342e21486438a Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Mon, 6 Nov 2023 16:31:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9A=E4=B9=89=E8=B0=83=E5=BA=A6=E5=99=A8?= =?UTF-8?q?=E7=9A=84=E6=95=B0=E6=8D=AE=E5=BA=93=E6=A8=A1=E5=9E=8B=EF=BC=9B?= =?UTF-8?q?=E4=BB=8E=E6=95=B0=E6=8D=AE=E5=BA=93=E4=B8=AD=E6=9F=A5=E8=AF=A2?= =?UTF-8?q?=E8=AE=A1=E7=AE=97=E4=B8=AD=E5=BF=83=E7=9A=84=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- advisor/internal/scheduler/scheduler.go | 175 ++++++++-------- .../prescheduler/default_prescheduler.go | 194 +++++++++--------- collector/internal/config/config.go | 11 +- collector/internal/mq/pcm.go | 2 +- collector/internal/mq/resource.go | 14 +- collector/internal/mq/slw.go | 48 ++--- collector/internal/mq/storage.go | 6 +- common/assets/confs/manager.config.json | 6 + common/globals/pools.go | 8 +- common/models/job/job.go | 8 +- common/models/job/normal_job.go | 10 +- common/models/models.go | 47 ++++- common/pkgs/db/cc_resource.go | 21 ++ common/pkgs/db/computing_center.go | 27 +++ common/pkgs/db/config/config.go | 21 ++ common/pkgs/db/db.go | 61 ++++++ common/pkgs/db/image.go | 38 ++++ common/pkgs/db/pcm_image.go | 35 ++++ common/pkgs/mq/collector/pcm.go | 7 +- common/pkgs/mq/collector/resource.go | 17 +- common/pkgs/mq/collector/slw.go | 7 +- common/pkgs/mq/collector/storage.go | 8 +- common/pkgs/mq/executor/pcm.go | 21 +- .../mq/executor/task/cache_move_package.go | 6 +- .../executor/task/storage_create_package.go | 6 +- common/pkgs/mq/executor/task/submit_task.go | 22 +- common/pkgs/mq/executor/task/upload_image.go | 28 +-- common/pkgs/mq/manager/computing_center.go | 33 +++ common/pkgs/mq/manager/image.go | 12 +- common/pkgs/mq/manager/server.go | 2 + executor/internal/config/config.go | 4 +- executor/internal/services/pcm.go | 8 +- executor/internal/task/cache_move_package.go | 6 +- executor/internal/task/pcm_schedule_task.go | 8 +- executor/internal/task/pcm_upload_img.go | 7 +- .../internal/task/storage_create_package.go | 4 +- .../internal/task/storage_load_package.go | 4 +- manager/internal/config/config.go | 6 +- manager/internal/imagemgr/imagemgr.go | 92 --------- manager/internal/jobmgr/adjusting_handler.go | 44 ++-- manager/internal/jobmgr/executing_handler.go | 21 +- manager/internal/jobmgr/jobmgr.go | 18 +- .../internal/jobmgr/prescheduling_handler.go | 49 +++-- manager/internal/mq/computing_center.go | 18 ++ manager/internal/mq/image.go | 14 +- manager/internal/mq/service.go | 8 +- manager/main.go | 10 +- 47 files changed, 712 insertions(+), 510 deletions(-) create mode 100644 common/pkgs/db/cc_resource.go create mode 100644 common/pkgs/db/computing_center.go create mode 100644 common/pkgs/db/config/config.go create mode 100644 common/pkgs/db/db.go create mode 100644 common/pkgs/db/image.go create mode 100644 common/pkgs/db/pcm_image.go create mode 100644 common/pkgs/mq/manager/computing_center.go delete mode 100644 manager/internal/imagemgr/imagemgr.go create mode 100644 manager/internal/mq/computing_center.go diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index 11fceef..7c92ec4 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -12,9 +12,10 @@ import ( "gitlink.org.cn/cloudream/common/utils/math" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) const ( @@ -39,11 +40,11 @@ type Scheduler interface { Schedule(info *jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) } -type candidateSlwNode struct { - SlwNode uopsdk.SlwNode - IsPreScheduledNode bool // 是否是在预调度时被选中的节点 - Resource resourcesDetail - Files filesDetail +type candidate struct { + CC schmod.ComputingCenter + IsPreScheduled bool // 是否是在预调度时被选中的节点 + Resource resourcesDetail + Files filesDetail } type resourcesDetail struct { @@ -76,20 +77,20 @@ type fileDetail struct { IsLoaded bool //表示storage是否已经调度到该节点, image表示镜像是否已经加载到该算力中心 } -type CandidateSlwNodeArr []*candidateSlwNode +type CandidateArr []*candidate -func (a CandidateSlwNodeArr) Len() int { return len(a) } -func (a CandidateSlwNodeArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a CandidateSlwNodeArr) Less(i, j int) bool { +func (a CandidateArr) Len() int { return len(a) } +func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a CandidateArr) Less(i, j int) bool { n1 := a[i] n2 := a[j] // 如果节点是预调度中选中的节点,那么只要资源满足需求,就优先选择这个节点 - if n1.IsPreScheduledNode && n1.Resource.MaxLevel < ResourceLevel3 { + if n1.IsPreScheduled && n1.Resource.MaxLevel < ResourceLevel3 { return true } - if n2.IsPreScheduledNode && n2.Resource.MaxLevel < ResourceLevel3 { + if n2.IsPreScheduled && n2.Resource.MaxLevel < ResourceLevel3 { return false } @@ -128,46 +129,46 @@ func NewDefaultSchedule() *DefaultScheduler { } func (s *DefaultScheduler) Schedule(job *jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) { - colCli, err := schglb.CollectorMQPool.Acquire() + mgrCli, err := schglb.ManagerMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } - defer schglb.CollectorMQPool.Release(colCli) + defer schglb.ManagerMQPool.Release(mgrCli) - allSlwNodes := make(map[schsdk.SlwNodeID]*candidateSlwNode) + allCCs := make(map[schsdk.CCID]*candidate) // 查询有哪些算力中心可用 - getNodesResp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo()) + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) if err != nil { - return nil, err + return nil, fmt.Errorf("getting all computing center info: %w", err) } - if len(getNodesResp.Nodes) == 0 { + if len(allCC.ComputingCenters) == 0 { return nil, ErrNoAvailableScheme } - for _, slwNode := range getNodesResp.Nodes { - allSlwNodes[slwNode.ID] = &candidateSlwNode{ - SlwNode: slwNode, - IsPreScheduledNode: slwNode.ID == job.TargetSlwNodeID, + for _, cc := range allCC.ComputingCenters { + allCCs[cc.CCID] = &candidate{ + CC: cc, + IsPreScheduled: cc.CCID == job.TargetCCID, } } // 计算 - err = s.calcFileScore(job.Files, allSlwNodes) + err = s.calcFileScore(job.Files, allCCs) if err != nil { return nil, err } - err = s.calcResourceScore(job, allSlwNodes) + err = s.calcResourceScore(job, allCCs) if err != nil { return nil, err } - allSlwNodesArr := lo.Values(allSlwNodes) - sort.Sort(CandidateSlwNodeArr(allSlwNodesArr)) + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) - targetNode := allSlwNodesArr[0] + targetNode := allCCsArr[0] if targetNode.Resource.MaxLevel == ResourceLevel3 { return nil, ErrNoAvailableScheme } @@ -176,24 +177,24 @@ func (s *DefaultScheduler) Schedule(job *jobmod.NormalJob) (*jobmod.JobScheduleS return &scheme, nil } -func (s *DefaultScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) jobmod.JobScheduleScheme { +func (s *DefaultScheduler) makeSchemeForNode(targetCC *candidate) jobmod.JobScheduleScheme { scheme := jobmod.JobScheduleScheme{ - TargetSlwNodeID: targetSlwNode.SlwNode.ID, + TargetCCID: targetCC.CC.CCID, } - if !targetSlwNode.Files.Dataset.IsLoaded { + if !targetCC.Files.Dataset.IsLoaded { scheme.Dataset.Action = jobmod.ActionLoad } else { scheme.Dataset.Action = jobmod.ActionNo } - if !targetSlwNode.Files.Code.IsLoaded { + if !targetCC.Files.Code.IsLoaded { scheme.Code.Action = jobmod.ActionLoad } else { scheme.Code.Action = jobmod.ActionNo } - if !targetSlwNode.Files.Image.IsLoaded { + if !targetCC.Files.Image.IsLoaded { scheme.Image.Action = jobmod.ActionImportImage } else { scheme.Image.Action = jobmod.ActionNo @@ -202,28 +203,28 @@ func (s *DefaultScheduler) makeSchemeForNode(targetSlwNode *candidateSlwNode) jo return scheme } -func (s *DefaultScheduler) calcResourceScore(job *jobmod.NormalJob, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error { - for _, slwNode := range allSlwNodes { - res, err := s.calcOneResourceScore(job.Info.Resources, slwNode.SlwNode.ID) +func (s *DefaultScheduler) calcResourceScore(job *jobmod.NormalJob, allCCs map[schsdk.CCID]*candidate) error { + for _, cc := range allCCs { + res, err := s.calcOneResourceScore(job.Info.Resources, &cc.CC) if err != nil { return err } - slwNode.Resource = *res + cc.Resource = *res } return nil } // 划分节点资源等级,并计算资源得分 -func (s *DefaultScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID schsdk.SlwNodeID) (*resourcesDetail, error) { +func (s *DefaultScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } defer schglb.CollectorMQPool.Release(colCli) - getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID)) + getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) if err != nil { return nil, err } @@ -359,76 +360,76 @@ func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int { } // 计算节点得分情况 -func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error { +func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsdk.CCID]*candidate) error { // 只计算运控返回的计算中心上的存储服务的数据权重 - stgNodeToSlwNode := make(map[int64]*candidateSlwNode) - for _, slwNode := range allSlwNodes { - stgNodeToSlwNode[slwNode.SlwNode.StgNodeID] = slwNode + cdsNodeToCC := make(map[int64]*candidate) + for _, cc := range allCCs { + cdsNodeToCC[cc.CC.CDSNodeID] = cc } //计算code相关得分 - codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, stgNodeToSlwNode) + codeFileScores, err := s.calcPackageFileScore(files.Code.PackageID, cdsNodeToCC) if err != nil { return fmt.Errorf("calc code file score: %w", err) } for id, score := range codeFileScores { - allSlwNodes[id].Files.Code = *score + allCCs[id].Files.Code = *score } //计算dataset相关得分 - datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, stgNodeToSlwNode) + datasetFileScores, err := s.calcPackageFileScore(files.Dataset.PackageID, cdsNodeToCC) if err != nil { return fmt.Errorf("calc dataset file score: %w", err) } for id, score := range datasetFileScores { - allSlwNodes[id].Files.Dataset = *score + allCCs[id].Files.Dataset = *score } //计算image相关得分 - imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allSlwNodes, stgNodeToSlwNode) + imageFileScores, err := s.calcImageFileScore(files.Image.ImageID, allCCs, cdsNodeToCC) if err != nil { return fmt.Errorf("calc image file score: %w", err) } for id, score := range imageFileScores { - allSlwNodes[id].Files.Image = *score + allCCs[id].Files.Image = *score } - for _, slwNode := range allSlwNodes { - slwNode.Files.TotalScore = slwNode.Files.Code.CachingScore + - slwNode.Files.Code.LoadingScore + - slwNode.Files.Dataset.CachingScore + - slwNode.Files.Dataset.LoadingScore + - slwNode.Files.Image.CachingScore + - slwNode.Files.Image.LoadingScore + for _, cc := range allCCs { + cc.Files.TotalScore = cc.Files.Code.CachingScore + + cc.Files.Code.LoadingScore + + cc.Files.Dataset.CachingScore + + cc.Files.Dataset.LoadingScore + + cc.Files.Image.CachingScore + + cc.Files.Image.LoadingScore } return nil } // 计算package在各节点的得分情况 -func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) { +func (s *DefaultScheduler) calcPackageFileScore(packageID int64, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } defer schglb.CollectorMQPool.Release(colCli) - slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail) + ccFileScores := make(map[schsdk.CCID]*fileDetail) cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID)) if err != nil { return nil, err } - for _, stgNodeCacheInfo := range cachedResp.NodeInfos { - slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID] + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] if !ok { continue } - slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{ + ccFileScores[cc.CC.CCID] = &fileDetail{ //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, } } @@ -437,27 +438,27 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNod return nil, err } - for _, stgNodeID := range loadedResp.StgNodeIDs { - slwNode, ok := stgNodeToSlwNode[stgNodeID] + for _, cdsNodeID := range loadedResp.StgNodeIDs { + cc, ok := cdsNodeToCC[cdsNodeID] if !ok { continue } - fsc, ok := slwNodeFileScores[slwNode.SlwNode.ID] + fsc, ok := ccFileScores[cc.CC.CCID] if !ok { fsc = &fileDetail{} - slwNodeFileScores[slwNode.SlwNode.ID] = fsc + ccFileScores[cc.CC.CCID] = fsc } fsc.LoadingScore = 1 * LoadedWeight fsc.IsLoaded = true } - return slwNodeFileScores, nil + return ccFileScores, nil } // 计算package在各节点的得分情况 -func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) { +func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -470,48 +471,50 @@ func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNode } defer schglb.ManagerMQPool.Release(magCli) - imageInfoResp, err := magCli.GetImageInfo(manager.NewGetImageInfo(imageID)) + imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) if err != nil { return nil, fmt.Errorf("getting image info: %w", err) } - slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail) + ccFileScores := make(map[schsdk.CCID]*fileDetail) - cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, imageInfoResp.PackageID)) - if err != nil { - return nil, err - } - - for _, stgNodeCacheInfo := range cachedResp.NodeInfos { - slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID] - if !ok { - continue + if imageInfoResp.Image.CDSPackageID != nil { + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, *imageInfoResp.Image.CDSPackageID)) + if err != nil { + return nil, err } - slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{ - //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } } } // 镜像的LoadingScore是判断是否导入到算力中心 - for _, importing := range imageInfoResp.ImportingInfos { - _, ok := allSlwNodes[importing.SlwNodeID] + for _, pcmImg := range imageInfoResp.PCMImages { + _, ok := allCCs[pcmImg.CCID] if !ok { continue } - fsc, ok := slwNodeFileScores[importing.SlwNodeID] + fsc, ok := ccFileScores[pcmImg.CCID] if !ok { fsc = &fileDetail{} - slwNodeFileScores[importing.SlwNodeID] = fsc + ccFileScores[pcmImg.CCID] = fsc } fsc.LoadingScore = 1 * LoadedWeight fsc.IsLoaded = true } - return slwNodeFileScores, nil + return ccFileScores, nil } func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { diff --git a/client/internal/prescheduler/default_prescheduler.go b/client/internal/prescheduler/default_prescheduler.go index 3e23d34..7d30516 100644 --- a/client/internal/prescheduler/default_prescheduler.go +++ b/client/internal/prescheduler/default_prescheduler.go @@ -12,9 +12,10 @@ import ( "gitlink.org.cn/cloudream/common/utils/math" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) const ( @@ -35,8 +36,8 @@ const ( var ErrNoAvailableScheme = fmt.Errorf("no appropriate scheduling node found, please wait") -type candidateSlwNode struct { - SlwNode uopsdk.SlwNode +type candidate struct { + CC schmod.ComputingCenter IsReferencedJobTarget bool // 这个节点是否是所依赖的任务所选择的节点 Resource resourcesDetail Files filesDetail @@ -77,11 +78,11 @@ type schedulingJob struct { Afters []string } -type CandidateSlwNodeArr []*candidateSlwNode +type CandidateArr []*candidate -func (a CandidateSlwNodeArr) Len() int { return len(a) } -func (a CandidateSlwNodeArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a CandidateSlwNodeArr) Less(i, j int) bool { +func (a CandidateArr) Len() int { return len(a) } +func (a CandidateArr) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a CandidateArr) Less(i, j int) bool { n1 := a[i] n2 := a[j] @@ -132,24 +133,25 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP } filesUploadSchemes := make(map[string]schsdk.LocalFileUploadScheme) - colCli, err := schglb.CollectorMQPool.Acquire() + mgrCli, err := schglb.ManagerMQPool.Acquire() if err != nil { return nil, nil, fmt.Errorf("new collector client: %w", err) } - defer schglb.CollectorMQPool.Release(colCli) + defer schglb.ManagerMQPool.Release(mgrCli) // 查询有哪些算力中心可用 - getNodesResp, err := colCli.GetAllSlwNodeInfo(collector.NewGetAllSlwNodeInfo()) + + allCC, err := mgrCli.GetAllComputingCenter(mgrmq.NewGetAllComputingCenter()) if err != nil { - return nil, nil, fmt.Errorf("getting all slw node info: %w", err) + return nil, nil, fmt.Errorf("getting all computing center info: %w", err) } - slwNodes := make(map[schsdk.SlwNodeID]uopsdk.SlwNode) - for _, node := range getNodesResp.Nodes { - slwNodes[node.ID] = node + ccs := make(map[schsdk.CCID]schmod.ComputingCenter) + for _, node := range allCC.ComputingCenters { + ccs[node.CCID] = node } - if len(slwNodes) == 0 { + if len(ccs) == 0 { return nil, nil, ErrNoAvailableScheme } @@ -184,7 +186,7 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP // 经过排序后,按顺序生成调度方案 for _, job := range schJobs { if norJob, ok := job.Job.(*schsdk.NormalJobInfo); ok { - scheme, err := s.scheduleForNormalJob(info, job, slwNodes, jobSetScheme.JobSchemes) + scheme, err := s.scheduleForNormalJob(info, job, ccs, jobSetScheme.JobSchemes) if err != nil { return nil, nil, err } @@ -192,7 +194,7 @@ func (s *DefaultPreScheduler) Schedule(info *schsdk.JobSetInfo) (*jobmod.JobSetP jobSetScheme.JobSchemes[job.Job.GetLocalJobID()] = *scheme // 检查数据文件的配置项,生成上传文件方案 - s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetSlwNodeID, filesUploadSchemes, slwNodes) + s.fillNormarlJobLocalUploadScheme(norJob, scheme.TargetCCID, filesUploadSchemes, ccs) } // 回源任务目前不需要生成调度方案 @@ -256,13 +258,13 @@ func (s *DefaultPreScheduler) orderByAfters(jobs []*schedulingJob) ([]*schedulin return orderedJob, true } -func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, slwNodes map[schsdk.SlwNodeID]uopsdk.SlwNode, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { - allSlwNodes := make(map[schsdk.SlwNodeID]*candidateSlwNode) +func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, job *schedulingJob, ccs map[schsdk.CCID]schmod.ComputingCenter, jobSchemes map[string]jobmod.JobScheduleScheme) (*jobmod.JobScheduleScheme, error) { + allCCs := make(map[schsdk.CCID]*candidate) // 初始化备选节点信息 - for _, slwNode := range slwNodes { - caNode := &candidateSlwNode{ - SlwNode: slwNode, + for _, cc := range ccs { + caNode := &candidate{ + CC: cc, } // 检查此节点是否是它所引用的任务所选的节点 @@ -278,33 +280,33 @@ func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, jo continue } - if scheme.TargetSlwNodeID == slwNode.ID { + if scheme.TargetCCID == cc.CCID { caNode.IsReferencedJobTarget = true break } } - allSlwNodes[slwNode.ID] = caNode + allCCs[cc.CCID] = caNode } norJob := job.Job.(*schsdk.NormalJobInfo) // 计算文件占有量得分 - err := s.calcFileScore(norJob.Files, allSlwNodes) + err := s.calcFileScore(norJob.Files, allCCs) if err != nil { return nil, err } // 计算资源余量得分 - err = s.calcResourceScore(norJob, allSlwNodes) + err = s.calcResourceScore(norJob, allCCs) if err != nil { return nil, err } - allSlwNodesArr := lo.Values(allSlwNodes) - sort.Sort(CandidateSlwNodeArr(allSlwNodesArr)) + allCCsArr := lo.Values(allCCs) + sort.Sort(CandidateArr(allCCsArr)) - targetNode := allSlwNodesArr[0] + targetNode := allCCsArr[0] if targetNode.Resource.MaxLevel == ResourceLevel3 { return nil, ErrNoAvailableScheme } @@ -313,58 +315,58 @@ func (s *DefaultPreScheduler) scheduleForNormalJob(jobSet *schsdk.JobSetInfo, jo return &scheme, nil } -func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetSlwNodeID schsdk.SlwNodeID, schemes map[string]schsdk.LocalFileUploadScheme, slwNodes map[schsdk.SlwNodeID]uopsdk.SlwNode) { +func (s *DefaultPreScheduler) fillNormarlJobLocalUploadScheme(norJob *schsdk.NormalJobInfo, targetCCID schsdk.CCID, schemes map[string]schsdk.LocalFileUploadScheme, ccs map[schsdk.CCID]schmod.ComputingCenter) { if localFile, ok := norJob.Files.Dataset.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { - stgNodeID := slwNodes[targetSlwNodeID].StgNodeID + cdsNodeID := ccs[targetCCID].CDSNodeID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ LocalPath: localFile.LocalPath, - UploadToStgNodeID: &stgNodeID, + UploadToCDSNodeID: &cdsNodeID, } } } if localFile, ok := norJob.Files.Code.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { - stgNodeID := slwNodes[targetSlwNodeID].StgNodeID + cdsNodeID := ccs[targetCCID].CDSNodeID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ LocalPath: localFile.LocalPath, - UploadToStgNodeID: &stgNodeID, + UploadToCDSNodeID: &cdsNodeID, } } } if localFile, ok := norJob.Files.Image.(*schsdk.LocalJobFileInfo); ok { if _, ok := schemes[localFile.LocalPath]; !ok { - stgNodeID := slwNodes[targetSlwNodeID].StgNodeID + cdsNodeID := ccs[targetCCID].CDSNodeID schemes[localFile.LocalPath] = schsdk.LocalFileUploadScheme{ LocalPath: localFile.LocalPath, - UploadToStgNodeID: &stgNodeID, + UploadToCDSNodeID: &cdsNodeID, } } } } -func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targetSlwNode *candidateSlwNode) jobmod.JobScheduleScheme { +func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targetCC *candidate) jobmod.JobScheduleScheme { scheme := jobmod.JobScheduleScheme{ - TargetSlwNodeID: targetSlwNode.SlwNode.ID, + TargetCCID: targetCC.CC.CCID, } // TODO 根据实际情况选择Move或者Load - if _, ok := job.Files.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetSlwNode.Files.Dataset.IsLoaded { + if _, ok := job.Files.Dataset.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Dataset.IsLoaded { scheme.Dataset.Action = jobmod.ActionLoad } else { scheme.Dataset.Action = jobmod.ActionNo } - if _, ok := job.Files.Code.(*schsdk.PackageJobFileInfo); ok && !targetSlwNode.Files.Code.IsLoaded { + if _, ok := job.Files.Code.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Code.IsLoaded { scheme.Code.Action = jobmod.ActionLoad } else { scheme.Code.Action = jobmod.ActionNo } - if _, ok := job.Files.Image.(*schsdk.PackageJobFileInfo); ok && !targetSlwNode.Files.Image.IsLoaded { + if _, ok := job.Files.Image.(*schsdk.PackageJobFileInfo); ok && !targetCC.Files.Image.IsLoaded { scheme.Image.Action = jobmod.ActionImportImage } else { scheme.Image.Action = jobmod.ActionNo @@ -373,28 +375,28 @@ func (s *DefaultPreScheduler) makeSchemeForNode(job *schsdk.NormalJobInfo, targe return scheme } -func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error { - for _, slwNode := range allSlwNodes { - res, err := s.calcOneResourceScore(job.Resources, slwNode.SlwNode.ID) +func (s *DefaultPreScheduler) calcResourceScore(job *schsdk.NormalJobInfo, allCCs map[schsdk.CCID]*candidate) error { + for _, cc := range allCCs { + res, err := s.calcOneResourceScore(job.Resources, &cc.CC) if err != nil { return err } - slwNode.Resource = *res + cc.Resource = *res } return nil } // 划分节点资源等级,并计算资源得分 -func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, slwNodeID schsdk.SlwNodeID) (*resourcesDetail, error) { +func (s *DefaultPreScheduler) calcOneResourceScore(requires schsdk.JobResourcesInfo, cc *schmod.ComputingCenter) (*resourcesDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } defer schglb.CollectorMQPool.Release(colCli) - getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(slwNodeID)) + getResDataResp, err := colCli.GetAllResourceData(collector.NewGetAllResourceData(cc.UOPSlwNodeID)) if err != nil { return nil, err } @@ -530,68 +532,68 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int } // 计算节点得分情况 -func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode) error { +func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { // 只计算运控返回的可用计算中心上的存储服务的数据权重 - stgNodeToSlwNode := make(map[int64]*candidateSlwNode) - for _, slwNode := range allSlwNodes { - stgNodeToSlwNode[slwNode.SlwNode.StgNodeID] = slwNode + cdsNodeToCC := make(map[int64]*candidate) + for _, cc := range allCCs { + cdsNodeToCC[cc.CC.CDSNodeID] = cc } //计算code相关得分 if pkgFile, ok := files.Code.(*schsdk.PackageJobFileInfo); ok { - codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, stgNodeToSlwNode) + codeFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) if err != nil { return fmt.Errorf("calc code file score: %w", err) } for id, score := range codeFileScores { - allSlwNodes[id].Files.Code = *score + allCCs[id].Files.Code = *score } } //计算dataset相关得分 if pkgFile, ok := files.Dataset.(*schsdk.PackageJobFileInfo); ok { - datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, stgNodeToSlwNode) + datasetFileScores, err := s.calcPackageFileScore(pkgFile.PackageID, cdsNodeToCC) if err != nil { return fmt.Errorf("calc dataset file score: %w", err) } for id, score := range datasetFileScores { - allSlwNodes[id].Files.Dataset = *score + allCCs[id].Files.Dataset = *score } } //计算image相关得分 if imgFile, ok := files.Image.(*schsdk.ImageJobFileInfo); ok { //计算image相关得分 - imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allSlwNodes, stgNodeToSlwNode) + imageFileScores, err := s.calcImageFileScore(imgFile.ImageID, allCCs, cdsNodeToCC) if err != nil { return fmt.Errorf("calc image file score: %w", err) } for id, score := range imageFileScores { - allSlwNodes[id].Files.Image = *score + allCCs[id].Files.Image = *score } } - for _, slwNode := range allSlwNodes { - slwNode.Files.TotalScore = slwNode.Files.Code.CachingScore + - slwNode.Files.Code.LoadingScore + - slwNode.Files.Dataset.CachingScore + - slwNode.Files.Dataset.LoadingScore + - slwNode.Files.Image.CachingScore + - slwNode.Files.Image.LoadingScore + for _, cc := range allCCs { + cc.Files.TotalScore = cc.Files.Code.CachingScore + + cc.Files.Code.LoadingScore + + cc.Files.Dataset.CachingScore + + cc.Files.Dataset.LoadingScore + + cc.Files.Image.CachingScore + + cc.Files.Image.LoadingScore } return nil } // 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) { +func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) } defer schglb.CollectorMQPool.Release(colCli) - slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail) + ccFileScores := make(map[schsdk.CCID]*fileDetail) // TODO UserID cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, packageID)) @@ -599,15 +601,15 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw return nil, err } - for _, stgNodeCacheInfo := range cachedResp.NodeInfos { - slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID] + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] if !ok { continue } - slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{ + ccFileScores[cc.CC.CCID] = &fileDetail{ //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, } } @@ -617,27 +619,27 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, stgNodeToSlw return nil, err } - for _, stgNodeID := range loadedResp.StgNodeIDs { - slwNode, ok := stgNodeToSlwNode[stgNodeID] + for _, cdsNodeID := range loadedResp.StgNodeIDs { + cc, ok := cdsNodeToCC[cdsNodeID] if !ok { continue } - sfc, ok := slwNodeFileScores[slwNode.SlwNode.ID] + sfc, ok := ccFileScores[cc.CC.CCID] if !ok { sfc = &fileDetail{} - slwNodeFileScores[slwNode.SlwNode.ID] = sfc + ccFileScores[cc.CC.CCID] = sfc } sfc.LoadingScore = 1 * LoadedWeight sfc.IsLoaded = true } - return slwNodeFileScores, nil + return ccFileScores, nil } // 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwNodes map[schsdk.SlwNodeID]*candidateSlwNode, stgNodeToSlwNode map[int64]*candidateSlwNode) (map[schsdk.SlwNodeID]*fileDetail, error) { +func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -650,48 +652,50 @@ func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allSlwN } defer schglb.ManagerMQPool.Release(magCli) - imageInfoResp, err := magCli.GetImageInfo(manager.NewGetImageInfo(imageID)) + imageInfoResp, err := magCli.GetImageInfo(mgrmq.NewGetImageInfo(imageID)) if err != nil { return nil, fmt.Errorf("getting image info: %w", err) } - slwNodeFileScores := make(map[schsdk.SlwNodeID]*fileDetail) + ccFileScores := make(map[schsdk.CCID]*fileDetail) - cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, imageInfoResp.PackageID)) - if err != nil { - return nil, err - } - - for _, stgNodeCacheInfo := range cachedResp.NodeInfos { - slwNode, ok := stgNodeToSlwNode[stgNodeCacheInfo.NodeID] - if !ok { - continue + if imageInfoResp.Image.CDSPackageID != nil { + cachedResp, err := colCli.PackageGetCachedStgNodes(collector.NewPackageGetCachedStgNodes(0, *imageInfoResp.Image.CDSPackageID)) + if err != nil { + return nil, err } - slwNodeFileScores[slwNode.SlwNode.ID] = &fileDetail{ - //TODO 根据缓存方式不同,可能会有不同的计算方式 - CachingScore: float64(stgNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + for _, cdsNodeCacheInfo := range cachedResp.NodeInfos { + cc, ok := cdsNodeToCC[cdsNodeCacheInfo.NodeID] + if !ok { + continue + } + + ccFileScores[cc.CC.CCID] = &fileDetail{ + //TODO 根据缓存方式不同,可能会有不同的计算方式 + CachingScore: float64(cdsNodeCacheInfo.FileSize) / float64(cachedResp.PackageSize) * CachingWeight, + } } } // 镜像的LoadingScore是判断是否导入到算力中心 - for _, importing := range imageInfoResp.ImportingInfos { - _, ok := allSlwNodes[importing.SlwNodeID] + for _, pcmImg := range imageInfoResp.PCMImages { + _, ok := allCCs[pcmImg.CCID] if !ok { continue } - fsc, ok := slwNodeFileScores[importing.SlwNodeID] + fsc, ok := ccFileScores[pcmImg.CCID] if !ok { fsc = &fileDetail{} - slwNodeFileScores[importing.SlwNodeID] = fsc + ccFileScores[pcmImg.CCID] = fsc } fsc.LoadingScore = 1 * LoadedWeight fsc.IsLoaded = true } - return slwNodeFileScores, nil + return ccFileScores, nil } func findResuorce[T uopsdk.ResourceData](all []uopsdk.ResourceData) T { for _, data := range all { diff --git a/collector/internal/config/config.go b/collector/internal/config/config.go index 3f104c8..0408383 100644 --- a/collector/internal/config/config.go +++ b/collector/internal/config/config.go @@ -2,18 +2,17 @@ package config import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" c "gitlink.org.cn/cloudream/common/utils/config" mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) type Config struct { - Logger log.Config `json:"logger"` - RabbitMQ mymq.Config `json:"rabbitMQ"` - CloudreamStorage stgsdk.Config `json:"cloudreamStorage"` - UnifyOps uopsdk.Config `json:"unifyOps"` - SlwNodes []uopsdk.SlwNode `json:"slwNodes"` + Logger log.Config `json:"logger"` + RabbitMQ mymq.Config `json:"rabbitMQ"` + CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` + UnifyOps uopsdk.Config `json:"unifyOps"` } var cfg Config diff --git a/collector/internal/mq/pcm.go b/collector/internal/mq/pcm.go index 49bd203..b605dc3 100644 --- a/collector/internal/mq/pcm.go +++ b/collector/internal/mq/pcm.go @@ -18,7 +18,7 @@ func (svc *Service) GetImageList(msg *colmq.GetImageList) (*colmq.GetImageListRe defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.GetImageList(pcmsdk.GetImageListReq{ - PartID: msg.SlwNodeID, + PartID: msg.PCMParticipantID, }) if err != nil { logger.Warnf("get image list failed, err: %s", err.Error()) diff --git a/collector/internal/mq/resource.go b/collector/internal/mq/resource.go index 158f08a..3a79be6 100644 --- a/collector/internal/mq/resource.go +++ b/collector/internal/mq/resource.go @@ -21,27 +21,27 @@ func (svc *Service) GetOneResourceData(msg *colmq.GetOneResourceData) (*colmq.Ge switch msg.Type { case uopsdk.ResourceTypeCPU: resp, err = uniOpsCli.GetCPUData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) case uopsdk.ResourceTypeNPU: resp, err = uniOpsCli.GetNPUData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) case uopsdk.ResourceTypeGPU: resp, err = uniOpsCli.GetGPUData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) case uopsdk.ResourceTypeMLU: resp, err = uniOpsCli.GetMLUData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) case uopsdk.ResourceTypeStorage: resp, err = uniOpsCli.GetStorageData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) case uopsdk.ResourceTypeMemory: resp, err = uniOpsCli.GetMemoryData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) default: return nil, mq.Failed(errorcode.OperationFailed, "invalid resource type") @@ -64,7 +64,7 @@ func (svc *Service) GetAllResourceData(msg *colmq.GetAllResourceData) (*colmq.Ge defer schglb.UnifyOpsPool.Release(uniOpsCli) resps, err := uniOpsCli.GetIndicatorData(uopsdk.GetOneResourceDataReq{ - SlwNodeID: msg.SlwNodeID, + SlwNodeID: msg.UOPSlwNodeID, }) if err != nil { logger.Warnf("get all resource data failed, err: %s", err.Error()) diff --git a/collector/internal/mq/slw.go b/collector/internal/mq/slw.go index 6ce32d9..c6bc5c0 100644 --- a/collector/internal/mq/slw.go +++ b/collector/internal/mq/slw.go @@ -6,7 +6,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" - "gitlink.org.cn/cloudream/scheduler/collector/internal/config" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" ) @@ -19,19 +18,15 @@ func (svc *Service) GetSlwNodeInfo(msg *colmq.GetSlwNodeInfo) (*colmq.GetSlwNode } defer schglb.UnifyOpsPool.Release(uniOpsCli) - // resp, err := uniOpsCli.GetAllSlwNodeInfo() - // if err != nil { - // logger.Warnf("get slwNode info failed, err: %s", err.Error()) - // return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed") - // } + resp, err := uniOpsCli.GetAllSlwNodeInfo() + if err != nil { + logger.Warnf("get slwNode info failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed") + } - var resp []uopsdk.SlwNode - // TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的,所以这里进行两个数据源的合并 - resp = svc.mergeConfig(resp) - - node, ok := lo.Find(resp, func(item uopsdk.SlwNode) bool { return item.ID == msg.SlwNodeID }) + node, ok := lo.Find(resp, func(item uopsdk.SlwNode) bool { return item.ID == msg.UOPSlwNodeID }) if !ok { - logger.WithField("SlwNodeID", msg.SlwNodeID). + logger.WithField("SlwNodeID", msg.UOPSlwNodeID). Warnf("slw node not found") return nil, mq.Failed(errorcode.OperationFailed, "slw node not found") } @@ -47,30 +42,11 @@ func (svc *Service) GetAllSlwNodeInfo(msg *colmq.GetAllSlwNodeInfo) (*colmq.GetA } defer schglb.UnifyOpsPool.Release(uniOpsCli) - //resp, err := uniOpsCli.GetAllSlwNodeInfo() - //if err != nil { - // logger.Warnf("get slwNode info failed, err: %s", err.Error()) - // return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed") - //} - - var resp []uopsdk.SlwNode - // TODO 目前计算中心对应的存储系统NodeID和StorageID是写在此服务的配置文件中的,所以这里进行两个数据源的合并 - resp = svc.mergeConfig(resp) + resp, err := uniOpsCli.GetAllSlwNodeInfo() + if err != nil { + logger.Warnf("get slwNode info failed, err: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get slwNode info failed") + } return mq.ReplyOK(colmq.NewGetAllSlwNodeInfoResp(resp)) } - -func (svc *Service) mergeConfig(infos []uopsdk.SlwNode) []uopsdk.SlwNode { - for _, configNode := range config.Cfg().SlwNodes { - infoNode, ok := lo.Find(infos, func(item uopsdk.SlwNode) bool { return item.ID == configNode.ID }) - if !ok { - infos = append(infos, configNode) - continue - } - - infoNode.StgNodeID = configNode.StgNodeID - infoNode.StorageID = configNode.StorageID - } - - return infos -} diff --git a/collector/internal/mq/storage.go b/collector/internal/mq/storage.go index 2df0ec9..ac2cd4f 100644 --- a/collector/internal/mq/storage.go +++ b/collector/internal/mq/storage.go @@ -4,7 +4,7 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/mq" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" ) @@ -17,7 +17,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.PackageGetCachedNodes(stgsdk.PackageGetCachedNodesReq{ + resp, err := stgCli.PackageGetCachedNodes(cdssdk.PackageGetCachedNodesReq{ PackageID: msg.PackageID, UserID: msg.UserID, }) @@ -37,7 +37,7 @@ func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.PackageGetLoadedNodes(stgsdk.PackageGetLoadedNodesReq{ + resp, err := stgCli.PackageGetLoadedNodes(cdssdk.PackageGetLoadedNodesReq{ PackageID: msg.PackageID, UserID: msg.UserID, }) diff --git a/common/assets/confs/manager.config.json b/common/assets/confs/manager.config.json index d0650e3..6f619a4 100644 --- a/common/assets/confs/manager.config.json +++ b/common/assets/confs/manager.config.json @@ -11,6 +11,12 @@ "password": "123456", "vhost": "/" }, + "db": { + "address": "127.0.0.1:3306", + "account": "root", + "password": "123456", + "databaseName": "scheduler" + }, "cloudreamStorage": { "url": "http://localhost:7890" }, diff --git a/common/globals/pools.go b/common/globals/pools.go index 1c9b176..0b49cf9 100644 --- a/common/globals/pools.go +++ b/common/globals/pools.go @@ -2,7 +2,7 @@ package schglb import ( pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor" @@ -26,10 +26,10 @@ func InitMQPool(cfg *scmq.Config) { ManagerMQPool = mgrmq.NewPool(cfg) } -var CloudreamStoragePool stgsdk.Pool +var CloudreamStoragePool cdssdk.Pool -func InitCloudreamStoragePool(cfg *stgsdk.Config) { - CloudreamStoragePool = stgsdk.NewPool(cfg) +func InitCloudreamStoragePool(cfg *cdssdk.Config) { + CloudreamStoragePool = cdssdk.NewPool(cfg) } var UnifyOpsPool uopsdk.Pool diff --git a/common/models/job/job.go b/common/models/job/job.go index a12d48e..46286e0 100644 --- a/common/models/job/job.go +++ b/common/models/job/job.go @@ -23,10 +23,10 @@ type FileScheduleScheme struct { // 任务调度方案 type JobScheduleScheme struct { - TargetSlwNodeID schsdk.SlwNodeID `json:"targetSlwNodeID"` - Dataset FileScheduleScheme `json:"dataset"` - Code FileScheduleScheme `json:"code"` - Image FileScheduleScheme `json:"image"` + TargetCCID schsdk.CCID `json:"targetCCID"` + Dataset FileScheduleScheme `json:"dataset"` + Code FileScheduleScheme `json:"code"` + Image FileScheduleScheme `json:"image"` } // 任务集的预调度方案 diff --git a/common/models/job/normal_job.go b/common/models/job/normal_job.go index 6108c3d..10766dc 100644 --- a/common/models/job/normal_job.go +++ b/common/models/job/normal_job.go @@ -6,10 +6,10 @@ import ( type NormalJob struct { JobBase - Info schsdk.NormalJobInfo `json:"info"` // 提交任务时提供的任务描述信息 - Files JobFiles `json:"files"` // 任务需要的文件 - TargetSlwNodeID schsdk.SlwNodeID `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID - OutputFullPath string `json:"outputFullPath"` // 程序结果的完整输出路径 + Info schsdk.NormalJobInfo `json:"info"` // 提交任务时提供的任务描述信息 + Files JobFiles `json:"files"` // 任务需要的文件 + TargetCCID schsdk.CCID `json:"targetSlwNodeID"` // 将要运行此任务的算力中心ID + OutputFullPath string `json:"outputFullPath"` // 程序结果的完整输出路径 } func NewNormalJob(jobSetID schsdk.JobSetID, jobID schsdk.JobID, info schsdk.NormalJobInfo) *NormalJob { @@ -40,6 +40,6 @@ type PackageJobFile struct { } type ImageJobFile struct { - PackageID int64 `json:"packageID"` + PackageID *int64 `json:"packageID"` ImageID schsdk.ImageID `json:"imageID"` } diff --git a/common/models/models.go b/common/models/models.go index dabd305..9c383fa 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -1,20 +1,53 @@ package schmod import ( + "time" + + pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" ) type ExecutorID string type AdvisorID string -type ImageInfo struct { - ImageID schsdk.ImageID `json:"imageID"` - PackageID int64 `json:"packageID"` // 镜像文件 - ImportingInfos []ImageImportingInfo `json:"importingInfos"` // 此镜像导入到了哪些节点 +type ComputingCenter struct { + CCID schsdk.CCID `json:"CCID" db:"CCID"` + UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID" db:"UOPSlwNodeID"` + PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID" db:"PCMParticipantID"` + CDSNodeID int64 `json:"cdsNodeID" db:"CDSNodeID"` + CDSStorageID int64 `json:"cdsStorageID" db:"CDSStorageID"` + Name string `json:"name" db:"Name"` } -type ImageImportingInfo struct { - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` - SlwNodeImageID schsdk.SlwNodeImageID `json:"slwNodeImageID"` +type Image struct { + ImageID schsdk.ImageID `json:"imageID" db:"ImageID"` + CDSPackageID *int64 `json:"cdsPackageID" db:"CDSPackageID"` + Name string `json:"name" db:"Name"` + CreateTime time.Time `json:"createTime" db:"CreateTime"` +} + +type PCMImage struct { + ImageID schsdk.ImageID `json:"imageID" db:"ImageID"` + CCID schsdk.CCID `json:"ccID" db:"CCID"` + PCMImageID pcmsdk.ImageID `json:"pcmImageID" db:"PCMImageID"` + Name string `json:"name" db:"Name"` + UploadTime time.Time `json:"uploadTime" db:"UploadTime"` +} + +type CCResource struct { + CCID schsdk.CCID `json:"ccID" db:"CCID"` + PCMResourceID pcmsdk.ResourceID `json:"pcmResourceID" db:"PCMResourceID"` + PCMName string `json:"pcmName" db:"PCMName"` + Resource CCResourceInfo `json:"resource" db:"Resource"` +} + +type CCResourceInfo struct { + CPU float64 `json:"cpu"` + GPU float64 `json:"gpu"` + NPU float64 `json:"npu"` + MLU float64 `json:"mlu"` + Storage int64 `json:"storage"` + Memory int64 `json:"memory"` } diff --git a/common/pkgs/db/cc_resource.go b/common/pkgs/db/cc_resource.go new file mode 100644 index 0000000..3a7a558 --- /dev/null +++ b/common/pkgs/db/cc_resource.go @@ -0,0 +1,21 @@ +package db + +import ( + "github.com/jmoiron/sqlx" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" +) + +type CCResourceDB struct { + *DB +} + +func (db *DB) CCResource() *CCResourceDB { + return &CCResourceDB{DB: db} +} + +func (*CCResourceDB) GetByCCID(ctx SQLContext, id schsdk.CCID) ([]schmod.CCResource, error) { + var ret []schmod.CCResource + err := sqlx.Select(ctx, &ret, "select * from CCResource where CCID = ?", id) + return ret, err +} diff --git a/common/pkgs/db/computing_center.go b/common/pkgs/db/computing_center.go new file mode 100644 index 0000000..cef4bcc --- /dev/null +++ b/common/pkgs/db/computing_center.go @@ -0,0 +1,27 @@ +package db + +import ( + "github.com/jmoiron/sqlx" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" +) + +type ComputingCenterDB struct { + *DB +} + +func (db *DB) ComputingCenter() *ComputingCenterDB { + return &ComputingCenterDB{DB: db} +} + +func (*ComputingCenterDB) GetByID(ctx SQLContext, id schsdk.CCID) (schmod.ComputingCenter, error) { + var ret schmod.ComputingCenter + err := sqlx.Get(ctx, &ret, "select * from ComputingCenter where CCID = ?", id) + return ret, err +} + +func (*ComputingCenterDB) GetAll(ctx SQLContext) ([]schmod.ComputingCenter, error) { + var ret []schmod.ComputingCenter + err := sqlx.Select(ctx, &ret, "select * from ComputingCenter") + return ret, err +} diff --git a/common/pkgs/db/config/config.go b/common/pkgs/db/config/config.go new file mode 100644 index 0000000..9495b71 --- /dev/null +++ b/common/pkgs/db/config/config.go @@ -0,0 +1,21 @@ +package config + +import "fmt" + +type Config struct { + Address string `json:"address"` + Account string `json:"account"` + Password string `json:"password"` + DatabaseName string `json:"databaseName"` +} + +func (cfg *Config) MakeSourceString() string { + return fmt.Sprintf( + "%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true&loc=%s", + cfg.Account, + cfg.Password, + cfg.Address, + cfg.DatabaseName, + "Asia%2FShanghai", + ) +} diff --git a/common/pkgs/db/db.go b/common/pkgs/db/db.go new file mode 100644 index 0000000..6b75d8f --- /dev/null +++ b/common/pkgs/db/db.go @@ -0,0 +1,61 @@ +package db + +import ( + "context" + "database/sql" + "fmt" + + _ "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config" +) + +type DB struct { + d *sqlx.DB +} + +type SQLContext interface { + sqlx.Queryer + sqlx.Execer + sqlx.Ext +} + +func NewDB(cfg *config.Config) (*DB, error) { + db, err := sqlx.Open("mysql", cfg.MakeSourceString()) + if err != nil { + return nil, fmt.Errorf("open database connection failed, err: %w", err) + } + + // 尝试连接一下数据库,如果数据库配置有错误在这里就能报出来 + err = db.Ping() + if err != nil { + return nil, err + } + + return &DB{ + d: db, + }, nil +} + +func (db *DB) DoTx(isolation sql.IsolationLevel, fn func(tx *sqlx.Tx) error) error { + tx, err := db.d.BeginTxx(context.Background(), &sql.TxOptions{Isolation: isolation}) + if err != nil { + return err + } + + if err := fn(tx); err != nil { + tx.Rollback() + return err + } + + if err := tx.Commit(); err != nil { + tx.Rollback() + return err + } + + return nil +} + +func (db *DB) SQLCtx() SQLContext { + return db.d +} diff --git a/common/pkgs/db/image.go b/common/pkgs/db/image.go new file mode 100644 index 0000000..e885ab3 --- /dev/null +++ b/common/pkgs/db/image.go @@ -0,0 +1,38 @@ +package db + +import ( + "fmt" + "time" + + "github.com/jmoiron/sqlx" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" +) + +type ImageDB struct { + *DB +} + +func (db *DB) Image() *ImageDB { + return &ImageDB{DB: db} +} + +func (*ImageDB) GetByID(ctx SQLContext, id schsdk.ImageID) (schmod.Image, error) { + var ret schmod.Image + err := sqlx.Get(ctx, &ret, "select * from Image where ImageID = ?", id) + return ret, err +} + +func (*ImageDB) Create(ctx SQLContext, cdsPackageID *int64, name string, createTime time.Time) (schsdk.ImageID, error) { + ret, err := ctx.Exec("insert into Image(CDSPackageID, Name, CreateTime) values(?, ?, ?)", cdsPackageID, name, createTime) + if err != nil { + return "", err + } + + id, err := ret.LastInsertId() + if err != nil { + return "", err + } + + return schsdk.ImageID(fmt.Sprintf("%d", id)), nil +} diff --git a/common/pkgs/db/pcm_image.go b/common/pkgs/db/pcm_image.go new file mode 100644 index 0000000..486650e --- /dev/null +++ b/common/pkgs/db/pcm_image.go @@ -0,0 +1,35 @@ +package db + +import ( + "time" + + "github.com/jmoiron/sqlx" + pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" +) + +type PCMImageDB struct { + *DB +} + +func (db *DB) PCMImage() *PCMImageDB { + return &PCMImageDB{DB: db} +} + +func (*PCMImageDB) GetByImageID(ctx SQLContext, id schsdk.ImageID) ([]schmod.PCMImage, error) { + var ret []schmod.PCMImage + err := sqlx.Select(ctx, &ret, "select * from PCMImage where ImageID = ?", id) + return ret, err +} + +func (*PCMImageDB) GetByImageIDAndCCID(ctx SQLContext, imageID schsdk.ImageID, ccID schsdk.CCID) (schmod.PCMImage, error) { + var ret schmod.PCMImage + err := sqlx.Get(ctx, &ret, "select * from PCMImage where ImageID = ? and CCID = ?", imageID, ccID) + return ret, err +} + +func (*PCMImageDB) Create(ctx SQLContext, imageID schsdk.ImageID, ccID schsdk.CCID, pcmImageID pcmsdk.ImageID, name string, uploadTime time.Time) error { + _, err := ctx.Exec("insert into PCMImage values(?, ?, ?, ?, ?)", imageID, ccID, pcmImageID, name, uploadTime) + return err +} diff --git a/common/pkgs/mq/collector/pcm.go b/common/pkgs/mq/collector/pcm.go index c7feac6..6f3694a 100644 --- a/common/pkgs/mq/collector/pcm.go +++ b/common/pkgs/mq/collector/pcm.go @@ -3,7 +3,6 @@ package collector import ( "gitlink.org.cn/cloudream/common/pkgs/mq" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" ) type PCMService interface { @@ -15,16 +14,16 @@ var _ = Register(Service.GetImageList) type GetImageList struct { mq.MessageBodyBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` + PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"` } type GetImageListResp struct { mq.MessageBodyBase Images []pcmsdk.Image `json:"images"` } -func NewGetImageList(slwNodeID schsdk.SlwNodeID) *GetImageList { +func NewGetImageList(pcmParticipantID pcmsdk.ParticipantID) *GetImageList { return &GetImageList{ - SlwNodeID: slwNodeID, + PCMParticipantID: pcmParticipantID, } } func NewGetImageListResp(images []pcmsdk.Image) *GetImageListResp { diff --git a/common/pkgs/mq/collector/resource.go b/common/pkgs/mq/collector/resource.go index 6edb35e..1d5ed45 100644 --- a/common/pkgs/mq/collector/resource.go +++ b/common/pkgs/mq/collector/resource.go @@ -2,7 +2,6 @@ package collector import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" ) @@ -17,18 +16,18 @@ var _ = Register(Service.GetOneResourceData) type GetOneResourceData struct { mq.MessageBodyBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` - Type uopsdk.ResourceType `json:"type"` + UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID"` + Type uopsdk.ResourceType `json:"type"` } type GetOneResourceDataResp struct { mq.MessageBodyBase Data uopsdk.ResourceData `json:"data"` } -func NewGetOneResourceData(nodeID schsdk.SlwNodeID, typ uopsdk.ResourceType) *GetOneResourceData { +func NewGetOneResourceData(uopSlwNodeID uopsdk.SlwNodeID, typ uopsdk.ResourceType) *GetOneResourceData { return &GetOneResourceData{ - SlwNodeID: nodeID, - Type: typ, + UOPSlwNodeID: uopSlwNodeID, + Type: typ, } } func NewGetOneResourceDataResp(data uopsdk.ResourceData) *GetOneResourceDataResp { @@ -45,16 +44,16 @@ var _ = Register(Service.GetAllResourceData) type GetAllResourceData struct { mq.MessageBodyBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` + UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID"` } type GetAllResourceDataResp struct { mq.MessageBodyBase Datas []uopsdk.ResourceData `json:"datas"` } -func NewGetAllResourceData(nodeId schsdk.SlwNodeID) *GetAllResourceData { +func NewGetAllResourceData(uopSlwNodeID uopsdk.SlwNodeID) *GetAllResourceData { return &GetAllResourceData{ - SlwNodeID: nodeId, + UOPSlwNodeID: uopSlwNodeID, } } func NewGetAllResourceDataResp(datas []uopsdk.ResourceData) *GetAllResourceDataResp { diff --git a/common/pkgs/mq/collector/slw.go b/common/pkgs/mq/collector/slw.go index 09f949b..fbc4a3c 100644 --- a/common/pkgs/mq/collector/slw.go +++ b/common/pkgs/mq/collector/slw.go @@ -2,7 +2,6 @@ package collector import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" ) @@ -17,16 +16,16 @@ var _ = Register(Service.GetSlwNodeInfo) type GetSlwNodeInfo struct { mq.MessageBodyBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` + UOPSlwNodeID uopsdk.SlwNodeID `json:"uopSlwNodeID"` } type GetSlwNodeInfoResp struct { mq.MessageBodyBase uopsdk.SlwNode } -func NewGetSlwNodeInfo(slwNodeID schsdk.SlwNodeID) *GetSlwNodeInfo { +func NewGetSlwNodeInfo(uopSlwNodeID uopsdk.SlwNodeID) *GetSlwNodeInfo { return &GetSlwNodeInfo{ - SlwNodeID: slwNodeID, + UOPSlwNodeID: uopSlwNodeID, } } func NewGetSlwNodeInfoResp(node uopsdk.SlwNode) *GetSlwNodeInfoResp { diff --git a/common/pkgs/mq/collector/storage.go b/common/pkgs/mq/collector/storage.go index 4c2ee62..f8b181b 100644 --- a/common/pkgs/mq/collector/storage.go +++ b/common/pkgs/mq/collector/storage.go @@ -2,7 +2,7 @@ package collector import ( "gitlink.org.cn/cloudream/common/pkgs/mq" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) type StorageService interface { @@ -21,7 +21,7 @@ type PackageGetCachedStgNodes struct { } type PackageGetCachedStgNodesResp struct { mq.MessageBodyBase - stgsdk.PackageCachingInfo + cdssdk.PackageCachingInfo } func NewPackageGetCachedStgNodes(userID int64, packageID int64) *PackageGetCachedStgNodes { @@ -30,9 +30,9 @@ func NewPackageGetCachedStgNodes(userID int64, packageID int64) *PackageGetCache PackageID: packageID, } } -func NewPackageGetCachedStgNodesResp(nodeInfos []stgsdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *PackageGetCachedStgNodesResp { +func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *PackageGetCachedStgNodesResp { return &PackageGetCachedStgNodesResp{ - PackageCachingInfo: stgsdk.PackageCachingInfo{ + PackageCachingInfo: cdssdk.PackageCachingInfo{ NodeInfos: nodeInfos, PackageSize: packageSize, RedunancyType: redunancyType, diff --git a/common/pkgs/mq/executor/pcm.go b/common/pkgs/mq/executor/pcm.go index ff42443..c6fd8a3 100644 --- a/common/pkgs/mq/executor/pcm.go +++ b/common/pkgs/mq/executor/pcm.go @@ -3,7 +3,6 @@ package executor import ( "gitlink.org.cn/cloudream/common/pkgs/mq" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" ) type PCMService interface { @@ -17,17 +16,17 @@ var _ = Register(Service.DeleteImage) type DeleteImage struct { mq.MessageBodyBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` - ImageID schsdk.SlwNodeImageID `json:"imageID"` + PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"` + PCMImageID pcmsdk.ImageID `json:"pcmImageID"` } type DeleteImageResp struct { mq.MessageBodyBase } -func NewDeleteImage(slwNodeID schsdk.SlwNodeID, imageID schsdk.SlwNodeImageID) *DeleteImage { +func NewDeleteImage(pcmParticipantID pcmsdk.ParticipantID, pcmImageID pcmsdk.ImageID) *DeleteImage { return &DeleteImage{ - SlwNodeID: slwNodeID, - ImageID: imageID, + PCMParticipantID: pcmParticipantID, + PCMImageID: pcmImageID, } } func NewDeleteImageResp() *DeleteImageResp { @@ -42,17 +41,17 @@ var _ = Register(Service.DeleteTask) type DeleteTask struct { mq.MessageBodyBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` - TaskID pcmsdk.TaskID `json:"taskID"` + PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"` + PCMTaskID pcmsdk.TaskID `json:"pcmTaskID"` } type DeleteTaskResp struct { mq.MessageBodyBase } -func NewDeleteTask(slwNodeID schsdk.SlwNodeID, taskID pcmsdk.TaskID) *DeleteTask { +func NewDeleteTask(pcmParticipantID pcmsdk.ParticipantID, pcmTaskID pcmsdk.TaskID) *DeleteTask { return &DeleteTask{ - SlwNodeID: slwNodeID, - TaskID: taskID, + PCMParticipantID: pcmParticipantID, + PCMTaskID: pcmTaskID, } } func NewDeleteTaskResp() *DeleteTaskResp { diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go index 2f5c521..7c2ef9c 100644 --- a/common/pkgs/mq/executor/task/cache_move_package.go +++ b/common/pkgs/mq/executor/task/cache_move_package.go @@ -1,6 +1,6 @@ package task -import stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" var _ = Register[*CacheMovePackage, *CacheMovePackageStatus]() @@ -13,7 +13,7 @@ type CacheMovePackage struct { type CacheMovePackageStatus struct { TaskStatusBase Error string `json:"error"` - CacheInfos []stgsdk.ObjectCacheInfo `json:"cacheInfos"` + CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"` } func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage { @@ -23,7 +23,7 @@ func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheM StgNodeID: stgNodeID, } } -func NewCacheMovePackageStatus(err string, cacheInfos []stgsdk.ObjectCacheInfo) *CacheMovePackageStatus { +func NewCacheMovePackageStatus(err string, cacheInfos []cdssdk.ObjectCacheInfo) *CacheMovePackageStatus { return &CacheMovePackageStatus{ Error: err, CacheInfos: cacheInfos, diff --git a/common/pkgs/mq/executor/task/storage_create_package.go b/common/pkgs/mq/executor/task/storage_create_package.go index 1f07959..f294527 100644 --- a/common/pkgs/mq/executor/task/storage_create_package.go +++ b/common/pkgs/mq/executor/task/storage_create_package.go @@ -1,6 +1,6 @@ package task -import stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" var _ = Register[*StorageCreatePackage, *StorageCreatePackageStatus]() @@ -11,7 +11,7 @@ type StorageCreatePackage struct { Path string `json:"path"` BucketID int64 `json:"bucketID"` Name string `json:"name"` - Redundancy stgsdk.TypedRedundancyInfo `json:"redundancy"` + Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"` } type StorageCreatePackageStatus struct { TaskStatusBase @@ -20,7 +20,7 @@ type StorageCreatePackageStatus struct { PackageID int64 `json:"packageID"` } -func NewStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy stgsdk.TypedRedundancyInfo) *StorageCreatePackage { +func NewStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy cdssdk.TypedRedundancyInfo) *StorageCreatePackage { return &StorageCreatePackage{ UserID: userID, StorageID: storageID, diff --git a/common/pkgs/mq/executor/task/submit_task.go b/common/pkgs/mq/executor/task/submit_task.go index 0fa3f52..73786b2 100644 --- a/common/pkgs/mq/executor/task/submit_task.go +++ b/common/pkgs/mq/executor/task/submit_task.go @@ -9,11 +9,11 @@ var _ = Register[*SubmitTask, *SubmitTaskStatus]() type SubmitTask struct { TaskInfoBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` - SlwNodeImageID schsdk.SlwNodeImageID `json:"slwNodeImageID"` - SlwNodeResourceID pcmsdk.ResourceID `json:"slwNodeResourceID"` - CMD string `json:"cmd"` - Envs []schsdk.KVPair `json:"envs"` + PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"` + PCMImageID pcmsdk.ImageID `json:"pcmImageID"` + PCMResourceID pcmsdk.ResourceID `json:"pcmResourceID"` + CMD string `json:"cmd"` + Envs []schsdk.KVPair `json:"envs"` } type SubmitTaskStatus struct { TaskStatusBase @@ -21,13 +21,13 @@ type SubmitTaskStatus struct { Error string `json:"error"` } -func NewSubmitTask(slwNodeID schsdk.SlwNodeID, slwNodeImageID schsdk.SlwNodeImageID, slwNodeResourceID pcmsdk.ResourceID, cmd string, envs []schsdk.KVPair) *SubmitTask { +func NewSubmitTask(pcmParticipantID pcmsdk.ParticipantID, pcmImageID pcmsdk.ImageID, pcmResourceID pcmsdk.ResourceID, cmd string, envs []schsdk.KVPair) *SubmitTask { return &SubmitTask{ - SlwNodeID: slwNodeID, - SlwNodeImageID: slwNodeImageID, - SlwNodeResourceID: slwNodeResourceID, - CMD: cmd, - Envs: envs, + PCMParticipantID: pcmParticipantID, + PCMImageID: pcmImageID, + PCMResourceID: pcmResourceID, + CMD: cmd, + Envs: envs, } } diff --git a/common/pkgs/mq/executor/task/upload_image.go b/common/pkgs/mq/executor/task/upload_image.go index 1e90b66..af5d889 100644 --- a/common/pkgs/mq/executor/task/upload_image.go +++ b/common/pkgs/mq/executor/task/upload_image.go @@ -1,33 +1,35 @@ package task import ( - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" ) var _ = Register[*UploadImage, *UploadImageStatus]() type UploadImage struct { TaskInfoBase - SlwNodeID schsdk.SlwNodeID `json:"slwNodeID"` - ImagePath string `json:"imagePath"` + PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"` + ImagePath string `json:"imagePath"` } type UploadImageStatus struct { TaskStatusBase - Status string `json:"status"` - Error string `json:"error"` - ImageID schsdk.SlwNodeImageID `json:"imageID"` + Status string `json:"status"` + Error string `json:"error"` + PCMImageID pcmsdk.ImageID `json:"pcmImageID"` + Name string `json:"name"` } -func NewUploadImage(slwNodeID schsdk.SlwNodeID, imagePath string) *UploadImage { +func NewUploadImage(pcmParticipantID pcmsdk.ParticipantID, imagePath string) *UploadImage { return &UploadImage{ - SlwNodeID: slwNodeID, - ImagePath: imagePath, + PCMParticipantID: pcmParticipantID, + ImagePath: imagePath, } } -func NewUploadImageStatus(status string, err string, imageID schsdk.SlwNodeImageID) *UploadImageStatus { +func NewUploadImageStatus(status string, err string, pcmImageID pcmsdk.ImageID, name string) *UploadImageStatus { return &UploadImageStatus{ - Status: status, - Error: err, - ImageID: imageID, + Status: status, + Error: err, + PCMImageID: pcmImageID, + Name: name, } } diff --git a/common/pkgs/mq/manager/computing_center.go b/common/pkgs/mq/manager/computing_center.go new file mode 100644 index 0000000..4addc91 --- /dev/null +++ b/common/pkgs/mq/manager/computing_center.go @@ -0,0 +1,33 @@ +package manager + +import ( + "gitlink.org.cn/cloudream/common/pkgs/mq" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" +) + +type ComputingCenterService interface { + GetAllComputingCenter(msg *GetAllComputingCenter) (*GetAllComputingCenterResp, *mq.CodeMessage) +} + +// 获取所有的算力中心信息 +var _ = Register(Service.GetAllComputingCenter) + +type GetAllComputingCenter struct { + mq.MessageBodyBase +} +type GetAllComputingCenterResp struct { + mq.MessageBodyBase + ComputingCenters []schmod.ComputingCenter `json:"computingCenters"` +} + +func NewGetAllComputingCenter() *GetAllComputingCenter { + return &GetAllComputingCenter{} +} +func NewGetAllComputingCenterResp(ccs []schmod.ComputingCenter) *GetAllComputingCenterResp { + return &GetAllComputingCenterResp{ + ComputingCenters: ccs, + } +} +func (c *Client) GetAllComputingCenter(msg *GetAllComputingCenter, opts ...mq.RequestOption) (*GetAllComputingCenterResp, error) { + return mq.Request(Service.GetAllComputingCenter, c.roundTripper, msg, opts...) +} diff --git a/common/pkgs/mq/manager/image.go b/common/pkgs/mq/manager/image.go index 50855a9..7d71431 100644 --- a/common/pkgs/mq/manager/image.go +++ b/common/pkgs/mq/manager/image.go @@ -51,7 +51,8 @@ type GetImageInfo struct { } type GetImageInfoResp struct { mq.MessageBodyBase - schmod.ImageInfo + Image schmod.Image + PCMImages []schmod.PCMImage } func NewGetImageInfo(imageID schsdk.ImageID) *GetImageInfo { @@ -59,13 +60,10 @@ func NewGetImageInfo(imageID schsdk.ImageID) *GetImageInfo { ImageID: imageID, } } -func NewGetImageInfoResp(imageID schsdk.ImageID, packageID int64, importingInfo []schmod.ImageImportingInfo) *GetImageInfoResp { +func NewGetImageInfoResp(image schmod.Image, pcmImages []schmod.PCMImage) *GetImageInfoResp { return &GetImageInfoResp{ - ImageInfo: schmod.ImageInfo{ - ImageID: imageID, - PackageID: packageID, - ImportingInfos: importingInfo, - }, + Image: image, + PCMImages: pcmImages, } } func (c *Client) GetImageInfo(msg *GetImageInfo, opts ...mq.RequestOption) (*GetImageInfoResp, error) { diff --git a/common/pkgs/mq/manager/server.go b/common/pkgs/mq/manager/server.go index 037649f..89f03b5 100644 --- a/common/pkgs/mq/manager/server.go +++ b/common/pkgs/mq/manager/server.go @@ -12,6 +12,8 @@ const ( type Service interface { AdvisorService + ComputingCenterService + ExecutorService ImageService diff --git a/executor/internal/config/config.go b/executor/internal/config/config.go index 628cd69..9f93441 100644 --- a/executor/internal/config/config.go +++ b/executor/internal/config/config.go @@ -3,7 +3,7 @@ package config import ( log "gitlink.org.cn/cloudream/common/pkgs/logger" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" c "gitlink.org.cn/cloudream/common/utils/config" mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) @@ -12,7 +12,7 @@ type Config struct { Logger log.Config `json:"logger"` ReportIntervalSec int `json:"reportIntervalSec"` RabbitMQ mymq.Config `json:"rabbitMQ"` - CloudreamStorage stgsdk.Config `json:"cloudreamStorage"` + CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` PCM pcmsdk.Config `json:"pcm"` } diff --git a/executor/internal/services/pcm.go b/executor/internal/services/pcm.go index 79cb076..b144205 100644 --- a/executor/internal/services/pcm.go +++ b/executor/internal/services/pcm.go @@ -18,8 +18,8 @@ func (svc *Service) DeleteImage(msg *execmq.DeleteImage) (*execmq.DeleteImageRes defer schglb.PCMPool.Release(pcmCli) err = pcmCli.DeleteImage(pcmsdk.DeleteImageReq{ - PartID: msg.SlwNodeID, - ImageID: msg.ImageID, + PartID: msg.PCMParticipantID, + ImageID: msg.PCMImageID, }) if err != nil { logger.Warnf("delete image failed, err: %s", err.Error()) @@ -37,8 +37,8 @@ func (svc *Service) DeleteTask(msg *execmq.DeleteTask) (*execmq.DeleteTaskResp, defer schglb.PCMPool.Release(pcmCli) err = pcmCli.DeleteTask(pcmsdk.DeleteTaskReq{ - PartID: msg.SlwNodeID, - TaskID: msg.TaskID, + PartID: msg.PCMParticipantID, + TaskID: msg.PCMTaskID, }) if err != nil { logger.Warnf("delete task failed, err: %s", err.Error()) diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index e0731a0..132c396 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -39,14 +39,14 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext }) } -func (t *CacheMovePackage) do(ctx TaskContext) ([]stgsdk.ObjectCacheInfo, error) { +func (t *CacheMovePackage) do(ctx TaskContext) ([]cdssdk.ObjectCacheInfo, error) { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { return nil, fmt.Errorf("new cloudream storage client: %w", err) } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.CacheMovePackage(stgsdk.CacheMovePackageReq{ + resp, err := stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ UserID: t.UserID, PackageID: t.PackageID, NodeID: t.StgNodeID, diff --git a/executor/internal/task/pcm_schedule_task.go b/executor/internal/task/pcm_schedule_task.go index dbe4169..73f55f0 100644 --- a/executor/internal/task/pcm_schedule_task.go +++ b/executor/internal/task/pcm_schedule_task.go @@ -48,9 +48,9 @@ func (t *PCMSubmitTask) do(taskID string, ctx TaskContext) error { defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.SubmitTask(pcmsdk.SubmitTaskReq{ - PartID: t.SlwNodeID, - ImageID: t.SlwNodeImageID, - ResourceID: t.SlwNodeResourceID, + PartID: t.PCMParticipantID, + ImageID: t.PCMImageID, + ResourceID: t.PCMResourceID, CMD: t.CMD, Envs: t.Envs, Params: []schsdk.KVPair{}, @@ -63,7 +63,7 @@ func (t *PCMSubmitTask) do(taskID string, ctx TaskContext) error { var prevStatus pcmsdk.TaskStatus for { tsResp, err := pcmCli.GetTask(pcmsdk.GetTaskReq{ - PartID: t.SlwNodeID, + PartID: t.PCMParticipantID, TaskID: resp.TaskID, }) if err != nil { diff --git a/executor/internal/task/pcm_upload_img.go b/executor/internal/task/pcm_upload_img.go index 925780f..6a17506 100644 --- a/executor/internal/task/pcm_upload_img.go +++ b/executor/internal/task/pcm_upload_img.go @@ -7,7 +7,6 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -30,7 +29,7 @@ func (t *PCMUploadImage) Execute(task *task.Task[TaskContext], ctx TaskContext, err := t.do(task.ID(), ctx) if err != nil { //TODO 若任务失败,上报的状态failed字段根据情况修改 - ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), schsdk.SlwNodeImageID(""))) + ctx.reporter.Report(task.ID(), exectsk.NewUploadImageStatus("failed", err.Error(), pcmsdk.ImageID(""), "")) } ctx.reporter.ReportNow() @@ -47,14 +46,14 @@ func (t *PCMUploadImage) do(taskID string, ctx TaskContext) error { defer schglb.PCMPool.Release(pcmCli) resp, err := pcmCli.UploadImage(pcmsdk.UploadImageReq{ - SlwNodeID: t.SlwNodeID, + PartID: t.PCMParticipantID, ImagePath: t.ImagePath, }) if err != nil { return err } - ctx.reporter.Report(taskID, exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID)) + ctx.reporter.Report(taskID, exectsk.NewUploadImageStatus(resp.Result, "", resp.ImageID, resp.Name)) return nil } diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go index 55c2ac2..acf8552 100644 --- a/executor/internal/task/storage_create_package.go +++ b/executor/internal/task/storage_create_package.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -45,7 +45,7 @@ func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error { } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.StorageCreatePackage(stgsdk.StorageCreatePackageReq{ + resp, err := stgCli.StorageCreatePackage(cdssdk.StorageCreatePackageReq{ UserID: t.UserID, StorageID: t.StorageID, Path: t.Path, diff --git a/executor/internal/task/storage_load_package.go b/executor/internal/task/storage_load_package.go index 55cb91f..c55195a 100644 --- a/executor/internal/task/storage_load_package.go +++ b/executor/internal/task/storage_load_package.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/task" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" ) @@ -47,7 +47,7 @@ func (t *StorageLoadPackage) do(ctx TaskContext) (string, error) { } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.StorageLoadPackage(stgsdk.StorageLoadPackageReq{ + resp, err := stgCli.StorageLoadPackage(cdssdk.StorageLoadPackageReq{ UserID: t.UserID, PackageID: t.PackageID, StorageID: t.StorageID, diff --git a/manager/internal/config/config.go b/manager/internal/config/config.go index 09878d2..bf66aee 100644 --- a/manager/internal/config/config.go +++ b/manager/internal/config/config.go @@ -2,15 +2,17 @@ package config import ( "gitlink.org.cn/cloudream/common/pkgs/logger" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/config" + db "gitlink.org.cn/cloudream/scheduler/common/pkgs/db/config" scmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq" ) type Config struct { Logger logger.Config `json:"logger"` RabbitMQ scmq.Config `json:"rabbitMQ"` - CloudreamStorage stgsdk.Config `json:"cloudreamStorage"` + DB db.Config `json:"db"` + CloudreamStorage cdssdk.Config `json:"cloudreamStorage"` ReportTimeoutSecs int `json:"reportTimeoutSecs"` } diff --git a/manager/internal/imagemgr/imagemgr.go b/manager/internal/imagemgr/imagemgr.go deleted file mode 100644 index 7743bec..0000000 --- a/manager/internal/imagemgr/imagemgr.go +++ /dev/null @@ -1,92 +0,0 @@ -package imagemgr - -import ( - "fmt" - "sync" - - schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - schmod "gitlink.org.cn/cloudream/scheduler/common/models" -) - -type Manager struct { - infos map[schsdk.ImageID]*schmod.ImageInfo - imageIDIndex int64 - - lock sync.Mutex -} - -func NewManager() (*Manager, error) { - return &Manager{ - infos: make(map[schsdk.ImageID]*schmod.ImageInfo), - }, nil -} - -func (m *Manager) GetImageInfo(imageID schsdk.ImageID) (*schmod.ImageInfo, error) { - m.lock.Lock() - defer m.lock.Unlock() - - info, ok := m.infos[imageID] - if !ok { - return nil, fmt.Errorf("image not found") - } - - return info, nil -} - -func (m *Manager) GetImageImportingInfo(imageID schsdk.ImageID, slwNodeID schsdk.SlwNodeID) (*schmod.ImageImportingInfo, error) { - m.lock.Lock() - defer m.lock.Unlock() - - info, ok := m.infos[imageID] - if !ok { - return nil, fmt.Errorf("image not found") - } - - for _, im := range info.ImportingInfos { - if im.SlwNodeID == slwNodeID { - return &im, nil - } - } - - return nil, fmt.Errorf("no importing info for this slw node") -} - -func (m *Manager) CreateImage(packageID int64) (*schmod.ImageInfo, error) { - m.lock.Lock() - defer m.lock.Unlock() - - imageID := schsdk.ImageID(fmt.Sprintf("%d", m.imageIDIndex)) - m.imageIDIndex++ - - info := &schmod.ImageInfo{ - ImageID: imageID, - PackageID: packageID, - } - - m.infos[imageID] = info - - return info, nil -} - -func (m *Manager) AddImageImportingInfo(imageID schsdk.ImageID, slwNodeID schsdk.SlwNodeID, slwNodeImageID schsdk.SlwNodeImageID) error { - m.lock.Lock() - defer m.lock.Unlock() - - info, ok := m.infos[imageID] - if !ok { - return fmt.Errorf("image not found") - } - - for _, im := range info.ImportingInfos { - if im.SlwNodeID == slwNodeID { - return fmt.Errorf("image had been imported to that slw node") - } - } - - info.ImportingInfos = append(info.ImportingInfos, schmod.ImageImportingInfo{ - SlwNodeID: slwNodeID, - SlwNodeImageID: slwNodeImageID, - }) - - return nil -} diff --git a/manager/internal/jobmgr/adjusting_handler.go b/manager/internal/jobmgr/adjusting_handler.go index 1beeab8..b5b587e 100644 --- a/manager/internal/jobmgr/adjusting_handler.go +++ b/manager/internal/jobmgr/adjusting_handler.go @@ -3,24 +3,24 @@ package jobmgr import ( "fmt" "reflect" + "time" "gitlink.org.cn/cloudream/common/pkgs/actor" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" - uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" - colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" "gitlink.org.cn/cloudream/scheduler/common/utils" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" ) type adjustingJob struct { - job *jobmod.NormalJob - state *jobmod.StateAdjusting - slwNodeInfo *uopsdk.SlwNode + job *jobmod.NormalJob + state *jobmod.StateAdjusting + ccInfo schmod.ComputingCenter } type AdjustingHandler struct { @@ -60,9 +60,9 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { } defer schglb.CollectorMQPool.Release(colCli) - getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(adjustingState.Scheme.TargetSlwNodeID)) + ccInfo, err := h.mgr.db.ComputingCenter().GetByID(h.mgr.db.SQLCtx(), adjustingState.Scheme.TargetCCID) if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting slw node info: %s", err.Error()), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting computing center info: %s", err.Error()), job.GetState())) return } @@ -73,22 +73,22 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { } defer schglb.CloudreamStoragePool.Release(stgCli) - stgInfo, err := stgCli.StorageGetInfo(stgsdk.StorageGetInfoReq{ - StorageID: getNodeResp.StorageID, + stgInfo, err := stgCli.StorageGetInfo(cdssdk.StorageGetInfoReq{ + StorageID: ccInfo.CDSStorageID, }) if err != nil { h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err), job.GetState())) return } - norJob.TargetSlwNodeID = adjustingState.Scheme.TargetSlwNodeID + norJob.TargetCCID = adjustingState.Scheme.TargetCCID // TODO UserID norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 0, norJob.JobID) adjJob := &adjustingJob{ - job: norJob, - state: adjustingState, - slwNodeInfo: &getNodeResp.SlwNode, + job: norJob, + state: adjustingState, + ccInfo: ccInfo, } h.jobs[job.GetJobID()] = adjJob @@ -163,7 +163,7 @@ func (h *AdjustingHandler) doPackageScheduling(evt event.Event, job *adjustingJo } if scheme.Action == jobmod.ActionMove { - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.slwNodeInfo.StgNodeID)) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.ccInfo.CDSNodeID)) if err != nil { return fmt.Errorf("starting cache move package: %w", err) } @@ -175,7 +175,7 @@ func (h *AdjustingHandler) doPackageScheduling(evt event.Event, job *adjustingJo } if scheme.Action == jobmod.ActionLoad { - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.slwNodeInfo.StorageID)) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.ccInfo.CDSStorageID)) if err != nil { return fmt.Errorf("starting stroage load package: %w", err) } @@ -245,7 +245,11 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob, // 要导入镜像,则需要先将镜像移动到指点节点的缓存中 if scheme.Action == jobmod.ActionImportImage { - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.slwNodeInfo.StgNodeID)) + if file.PackageID == nil { + return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, job.ccInfo.CCID) + } + + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, *file.PackageID, job.ccInfo.CDSNodeID)) if err != nil { return fmt.Errorf("starting cache move package: %w", err) } @@ -277,7 +281,7 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob, return fmt.Errorf("there must be only 1 object in the package that will be imported") } - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.slwNodeInfo.ID, stgsdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) if err != nil { return fmt.Errorf("starting import image: %w", err) } @@ -304,9 +308,9 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob, } // 调整过程中不会更换镜像,所以ImageID不会发生变化 - err = h.mgr.imageMgr.AddImageImportingInfo(file.ImageID, job.slwNodeInfo.ID, uploadImageRet.ImageID) + err = h.mgr.db.PCMImage().Create(h.mgr.db.SQLCtx(), file.ImageID, job.ccInfo.CCID, uploadImageRet.PCMImageID, uploadImageRet.Name, time.Now()) if err != nil { - return fmt.Errorf("creating image info: %w", err) + return fmt.Errorf("creating pcm image info: %w", err) } state.Step = jobmod.StepCompleted diff --git a/manager/internal/jobmgr/executing_handler.go b/manager/internal/jobmgr/executing_handler.go index 5a0a74b..c20c385 100644 --- a/manager/internal/jobmgr/executing_handler.go +++ b/manager/internal/jobmgr/executing_handler.go @@ -11,7 +11,6 @@ import ( schglb "gitlink.org.cn/cloudream/scheduler/common/globals" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" - colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" exetsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" "gitlink.org.cn/cloudream/scheduler/common/utils" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" @@ -71,9 +70,15 @@ func (h *ExecutingHandler) onJobEvent(evt event.Event, job *executingJob) { func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob, norJob *jobmod.NormalJob) { if job.state.FullTaskID == "" { - info, err := h.mgr.imageMgr.GetImageImportingInfo(norJob.Files.Image.ImageID, norJob.TargetSlwNodeID) + pcmImgInfo, err := h.mgr.db.PCMImage().GetByImageIDAndCCID(h.mgr.db.SQLCtx(), norJob.Files.Image.ImageID, norJob.TargetCCID) if err != nil { - h.changeJobState(job.job, jobmod.NewStateFailed("getting image importing info: "+err.Error(), job.state)) + h.changeJobState(job.job, jobmod.NewStateFailed("getting pcm image info: "+err.Error(), job.state)) + return + } + + ccInfo, err := h.mgr.db.ComputingCenter().GetByID(h.mgr.db.SQLCtx(), norJob.TargetCCID) + if err != nil { + h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("getting computing center info: %s", err.Error()), job.state)) return } @@ -81,8 +86,8 @@ func (h *ExecutingHandler) onNormalJobEvent(evt event.Event, job *executingJob, fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(), exetsk.NewSubmitTask( - norJob.TargetSlwNodeID, - info.SlwNodeImageID, + ccInfo.PCMParticipantID, + pcmImgInfo.PCMImageID, // TODO 资源ID "6388d3c27f654fa5b11439a3d6098dbc", norJob.Info.Runtime.Command, @@ -160,15 +165,15 @@ func (h *ExecutingHandler) onResourceJobEvent(evt event.Event, job *executingJob } defer schglb.CollectorMQPool.Release(colCli) - getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(tarNorJob.TargetSlwNodeID)) + ccInfo, err := h.mgr.db.ComputingCenter().GetByID(h.mgr.db.SQLCtx(), tarNorJob.TargetCCID) if err != nil { - h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("getting slw node info: %s", err.Error()), job.state)) + h.changeJobState(job.job, jobmod.NewStateFailed(fmt.Sprintf("getting computing center info: %s", err.Error()), job.state)) return } fullTaskID, err := h.mgr.execMgr.StartTask(job.job.GetJobID(), exetsk.NewStorageCreatePackage( 0, // TOOD 用户ID - getNodeResp.StorageID, + ccInfo.CDSStorageID, tarNorJob.OutputFullPath, resJob.Info.BucketID, utils.MakeResourcePackageName(resJob.JobID), diff --git a/manager/internal/jobmgr/jobmgr.go b/manager/internal/jobmgr/jobmgr.go index 009c47b..ae855e6 100644 --- a/manager/internal/jobmgr/jobmgr.go +++ b/manager/internal/jobmgr/jobmgr.go @@ -11,11 +11,11 @@ import ( schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" "gitlink.org.cn/cloudream/scheduler/manager/internal/advisormgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr" - "gitlink.org.cn/cloudream/scheduler/manager/internal/imagemgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" ) @@ -28,9 +28,9 @@ type Manager struct { // 任何修改job、jobset的操作,都需要加这个锁 pubLock sync.Mutex - execMgr *executormgr.Manager - advMgr *advisormgr.Manager - imageMgr *imagemgr.Manager + execMgr *executormgr.Manager + advMgr *advisormgr.Manager + db *db.DB handlers map[reflect.Type]StateHandler defaultHandler StateHandler @@ -41,11 +41,11 @@ type Manager struct { jobs map[schsdk.JobID]*mgrJob } -func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, imageMgr *imagemgr.Manager) (*Manager, error) { +func NewManager(execMgr *executormgr.Manager, advMgr *advisormgr.Manager, db *db.DB) (*Manager, error) { mgr := &Manager{ - execMgr: execMgr, - advMgr: advMgr, - imageMgr: imageMgr, + execMgr: execMgr, + advMgr: advMgr, + db: db, handlers: make(map[reflect.Type]StateHandler), jobSets: make(map[schsdk.JobSetID]*jobmod.JobSet), @@ -136,7 +136,7 @@ func (m *Manager) SubmitJobSet(jobSetInfo schsdk.JobSetInfo, preScheduleScheme j } job.State = jobmod.NewStatePreScheduling(preSch) - job.TargetSlwNodeID = preSch.TargetSlwNodeID + job.TargetCCID = preSch.TargetCCID case *schsdk.ResourceJobInfo: job := jobmod.NewResourceJob(jobSetID, jobID, *info) diff --git a/manager/internal/jobmgr/prescheduling_handler.go b/manager/internal/jobmgr/prescheduling_handler.go index bea013f..ff01050 100644 --- a/manager/internal/jobmgr/prescheduling_handler.go +++ b/manager/internal/jobmgr/prescheduling_handler.go @@ -3,15 +3,15 @@ package jobmgr import ( "fmt" "reflect" + "time" "gitlink.org.cn/cloudream/common/pkgs/actor" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" - stgsdk "gitlink.org.cn/cloudream/common/sdks/storage" - uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + schmod "gitlink.org.cn/cloudream/scheduler/common/models" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" - colmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/collector" exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event" ) @@ -19,9 +19,9 @@ import ( var ErrPreScheduleFailed = fmt.Errorf("pre schedule failed") type preSchedulingJob struct { - job *jobmod.NormalJob - state *jobmod.StatePreScheduling - slwNodeInfo *uopsdk.SlwNode + job *jobmod.NormalJob + state *jobmod.StatePreScheduling + ccInfo schmod.ComputingCenter } type PreSchedulingHandler struct { @@ -61,17 +61,17 @@ func (h *PreSchedulingHandler) Handle(job jobmod.Job) { } defer schglb.CollectorMQPool.Release(colCli) - getNodeResp, err := colCli.GetSlwNodeInfo(colmq.NewGetSlwNodeInfo(preSchState.Scheme.TargetSlwNodeID)) + ccInfo, err := h.mgr.db.ComputingCenter().GetByID(h.mgr.db.SQLCtx(), preSchState.Scheme.TargetCCID) if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting slw node info: %s", err.Error()), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting computing center info: %s", err.Error()), job.GetState())) return } - norJob.TargetSlwNodeID = preSchState.Scheme.TargetSlwNodeID + norJob.TargetCCID = preSchState.Scheme.TargetCCID preJob := &preSchedulingJob{ - job: norJob, - state: preSchState, - slwNodeInfo: &getNodeResp.SlwNode, + job: norJob, + state: preSchState, + ccInfo: ccInfo, } h.jobs[job.GetJobID()] = preJob @@ -182,7 +182,7 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche } if scheme.Action == jobmod.ActionMove { - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.slwNodeInfo.StgNodeID)) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.ccInfo.CDSNodeID)) if err != nil { return fmt.Errorf("starting cache move package: %w", err) } @@ -194,7 +194,7 @@ func (h *PreSchedulingHandler) doPackageScheduling(evt event.Event, job *preSche } if scheme.Action == jobmod.ActionLoad { - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.slwNodeInfo.StorageID)) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.ccInfo.CDSStorageID)) if err != nil { return fmt.Errorf("starting stroage load package: %w", err) } @@ -260,13 +260,13 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu state.Step = jobmod.StepUploading case *schsdk.ImageJobFileInfo: - imageInfo, err := h.mgr.imageMgr.GetImageInfo(info.ImageID) + imageInfo, err := h.mgr.db.Image().GetByID(h.mgr.db.SQLCtx(), info.ImageID) if err != nil { return fmt.Errorf("getting image info: %w", err) } file.ImageID = imageInfo.ImageID - file.PackageID = imageInfo.PackageID + file.PackageID = imageInfo.CDSPackageID state.Step = jobmod.StepUploaded default: @@ -293,14 +293,15 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu } // 上传完毕,则可以新建一个空的镜像的记录 - info, err := h.mgr.imageMgr.CreateImage(localFileCmd.PackageID) + // TODO 镜像名称 + imgID, err := h.mgr.db.Image().Create(h.mgr.db.SQLCtx(), &localFileCmd.PackageID, fmt.Sprintf("UPLOAD@%s", time.Now().Unix()), time.Now()) if err != nil { return fmt.Errorf("creating image info: %w", err) } // 填充ImageID和PackageID - file.ImageID = info.ImageID - file.PackageID = localFileCmd.PackageID + file.ImageID = imgID + file.PackageID = &localFileCmd.PackageID state.Step = jobmod.StepUploaded } @@ -312,7 +313,11 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu // 要导入镜像,则需要先将镜像移动到指点节点的缓存中 if scheme.Action == jobmod.ActionImportImage { - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.slwNodeInfo.StgNodeID)) + if file.PackageID == nil { + return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, job.ccInfo.CCID) + } + + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, *file.PackageID, job.ccInfo.CDSNodeID)) if err != nil { return fmt.Errorf("starting cache move package: %w", err) } @@ -348,7 +353,7 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu return fmt.Errorf("there must be only 1 object in the package which will be imported") } - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.slwNodeInfo.ID, stgsdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) if err != nil { return fmt.Errorf("starting import image: %w", err) } @@ -374,7 +379,7 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu return fmt.Errorf("import image: %s", uploadImageRet.Error) } - err = h.mgr.imageMgr.AddImageImportingInfo(file.ImageID, job.slwNodeInfo.ID, uploadImageRet.ImageID) + err = h.mgr.db.PCMImage().Create(h.mgr.db.SQLCtx(), file.ImageID, job.ccInfo.CCID, uploadImageRet.PCMImageID, uploadImageRet.Name, time.Now()) if err != nil { return fmt.Errorf("adding image importing info: %w", err) } diff --git a/manager/internal/mq/computing_center.go b/manager/internal/mq/computing_center.go new file mode 100644 index 0000000..4d8e9ee --- /dev/null +++ b/manager/internal/mq/computing_center.go @@ -0,0 +1,18 @@ +package mq + +import ( + "gitlink.org.cn/cloudream/common/consts/errorcode" + "gitlink.org.cn/cloudream/common/pkgs/logger" + "gitlink.org.cn/cloudream/common/pkgs/mq" + mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" +) + +func (svc *Service) GetAllComputingCenter(msg *mgrmq.GetAllComputingCenter) (*mgrmq.GetAllComputingCenterResp, *mq.CodeMessage) { + ccs, err := svc.db.ComputingCenter().GetAll(svc.db.SQLCtx()) + if err != nil { + logger.Warnf("getting all computing center: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get all computing center failed") + } + + return mq.ReplyOK(mgrmq.NewGetAllComputingCenterResp(ccs)) +} diff --git a/manager/internal/mq/image.go b/manager/internal/mq/image.go index ad5cf4f..7bbf7ec 100644 --- a/manager/internal/mq/image.go +++ b/manager/internal/mq/image.go @@ -8,11 +8,17 @@ import ( ) func (svc *Service) GetImageInfo(msg *mgrmq.GetImageInfo) (*mgrmq.GetImageInfoResp, *mq.CodeMessage) { - info, err := svc.imgMgr.GetImageInfo(msg.ImageID) + image, err := svc.db.Image().GetByID(svc.db.SQLCtx(), msg.ImageID) if err != nil { - logger.WithField("ImageID", msg.ImageID).Warnf("getting image info: %s", err.Error()) - return nil, mq.Failed(errorcode.OperationFailed, "get image info failed") + logger.WithField("ImageID", msg.ImageID).Warnf("getting image by id: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get image failed") } - return mq.ReplyOK(mgrmq.NewGetImageInfoResp(info.ImageID, info.PackageID, info.ImportingInfos)) + pcmImages, err := svc.db.PCMImage().GetByImageID(svc.db.SQLCtx(), msg.ImageID) + if err != nil { + logger.WithField("ImageID", msg.ImageID).Warnf("getting pcm image by image id: %s", err.Error()) + return nil, mq.Failed(errorcode.OperationFailed, "get pcm images failed") + } + + return mq.ReplyOK(mgrmq.NewGetImageInfoResp(image, pcmImages)) } diff --git a/manager/internal/mq/service.go b/manager/internal/mq/service.go index 5f61bb7..15fe780 100644 --- a/manager/internal/mq/service.go +++ b/manager/internal/mq/service.go @@ -1,9 +1,9 @@ package mq import ( + "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" "gitlink.org.cn/cloudream/scheduler/manager/internal/advisormgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr" - "gitlink.org.cn/cloudream/scheduler/manager/internal/imagemgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" ) @@ -11,14 +11,14 @@ type Service struct { jobMgr *jobmgr.Manager exeMgr *executormgr.Manager advMgr *advisormgr.Manager - imgMgr *imagemgr.Manager + db *db.DB } -func NewService(jobMan *jobmgr.Manager, exeMgr *executormgr.Manager, advMgr *advisormgr.Manager, imgMgr *imagemgr.Manager) (*Service, error) { +func NewService(jobMan *jobmgr.Manager, exeMgr *executormgr.Manager, advMgr *advisormgr.Manager, db *db.DB) (*Service, error) { return &Service{ jobMgr: jobMan, exeMgr: exeMgr, advMgr: advMgr, - imgMgr: imgMgr, + db: db, }, nil } diff --git a/manager/main.go b/manager/main.go index 87c3d06..c5ca7d0 100644 --- a/manager/main.go +++ b/manager/main.go @@ -9,11 +9,11 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" "gitlink.org.cn/cloudream/scheduler/manager/internal/advisormgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/config" "gitlink.org.cn/cloudream/scheduler/manager/internal/executormgr" - "gitlink.org.cn/cloudream/scheduler/manager/internal/imagemgr" "gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr" mqsvc "gitlink.org.cn/cloudream/scheduler/manager/internal/mq" ) @@ -46,19 +46,19 @@ func main() { os.Exit(1) } - imgMgr, err := imagemgr.NewManager() + db, err := db.NewDB(&config.Cfg().DB) if err != nil { - fmt.Printf("new image manager: %s", err.Error()) + fmt.Printf("new db: %s", err.Error()) os.Exit(1) } - jobMgr, err := jobmgr.NewManager(exeMgr, advMgr, imgMgr) + jobMgr, err := jobmgr.NewManager(exeMgr, advMgr, db) if err != nil { fmt.Printf("new job manager: %s", err.Error()) os.Exit(1) } - svc, err := mqsvc.NewService(jobMgr, exeMgr, advMgr, imgMgr) + svc, err := mqsvc.NewService(jobMgr, exeMgr, advMgr, db) if err != nil { fmt.Printf("new service: %s", err.Error()) os.Exit(1)