JCC-CSScheduler/manager/internal/jobmgr/adjusting_handler.go

372 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package jobmgr
import (
"fmt"
"reflect"
"time"
"gitlink.org.cn/cloudream/common/pkgs/actor"
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
"gitlink.org.cn/cloudream/scheduler/common/utils"
"gitlink.org.cn/cloudream/scheduler/manager/internal/jobmgr/event"
)
type adjustingJob struct {
job *jobmod.NormalJob
state *jobmod.StateAdjusting
ccInfo schmod.ComputingCenter
}
type AdjustingHandler struct {
mgr *Manager
jobs map[schsdk.JobID]*adjustingJob
cmdChan actor.CommandChannel
}
func NewAdjustingHandler(mgr *Manager) *AdjustingHandler {
return &AdjustingHandler{
mgr: mgr,
jobs: make(map[schsdk.JobID]*adjustingJob),
cmdChan: *actor.NewCommandChannel(),
}
}
func (h *AdjustingHandler) Handle(job jobmod.Job) {
h.cmdChan.Send(func() {
norJob, ok := job.(*jobmod.NormalJob)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow job: %v", reflect.TypeOf(job)), job.GetState()))
return
}
adjustingState, ok := norJob.GetState().(*jobmod.StateAdjusting)
if !ok {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("unknow state: %v", reflect.TypeOf(job.GetState())), job.GetState()))
return
}
colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err.Error()), job.GetState()))
return
}
defer schglb.CollectorMQPool.Release(colCli)
ccInfo, err := h.mgr.db.ComputingCenter().GetByID(h.mgr.db.SQLCtx(), adjustingState.Scheme.TargetCCID)
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting computing center info: %s", err.Error()), job.GetState()))
return
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err.Error()), job.GetState()))
return
}
defer schglb.CloudreamStoragePool.Release(stgCli)
stgInfo, err := stgCli.StorageGetInfo(cdssdk.StorageGetInfoReq{
StorageID: ccInfo.CDSStorageID,
})
if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err.Error()), job.GetState()))
return
}
norJob.TargetCCID = adjustingState.Scheme.TargetCCID
// TODO UserID
norJob.OutputFullPath = utils.MakeJobOutputFullPath(stgInfo.Directory, 0, norJob.JobID)
adjJob := &adjustingJob{
job: norJob,
state: adjustingState,
ccInfo: ccInfo,
}
h.jobs[job.GetJobID()] = adjJob
h.onJobEvent(nil, adjJob)
})
}
func (h *AdjustingHandler) onJobEvent(evt event.Event, job *adjustingJob) {
if cloneEvt, ok := evt.(*event.CloneJob); ok {
cloneEvt.Callback.SetValue(job.job.Clone())
return
}
err := h.doPackageScheduling(evt, job,
job.job.Info.Files.Dataset, &job.job.Files.Dataset,
&job.state.Scheme.Dataset, &job.state.Dataset,
)
if err != nil {
job.state.Dataset.Error = err.Error()
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
}
err = h.doPackageScheduling(evt, job,
job.job.Info.Files.Code, &job.job.Files.Code,
&job.state.Scheme.Code, &job.state.Code,
)
if err != nil {
job.state.Code.Error = err.Error()
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
}
err = h.doImageScheduling(evt, job,
job.job.Info.Files.Image, &job.job.Files.Image,
&job.state.Scheme.Image, &job.state.Image,
)
if err != nil {
job.state.Image.Error = err.Error()
h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state))
return
}
// 如果三种文件都调度完成,则可以进入下个阶段了
if job.state.Dataset.Step == jobmod.StepCompleted &&
job.state.Code.Step == jobmod.StepCompleted &&
job.state.Image.Step == jobmod.StepCompleted {
h.changeJobState(job.job, jobmod.NewStateReadyToExecute())
}
}
func (h *AdjustingHandler) changeJobState(job jobmod.Job, state jobmod.JobState) {
job.SetState(state)
delete(h.jobs, job.GetJobID())
h.mgr.pubLock.Lock()
h.mgr.handleState(job)
h.mgr.pubLock.Unlock()
}
func (h *AdjustingHandler) doPackageScheduling(evt event.Event, job *adjustingJob, fileInfo schsdk.JobFileInfo, file *jobmod.PackageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error {
if state.Step == jobmod.StepBegin {
state.Step = jobmod.StepUploaded
}
if state.Step == jobmod.StepUploaded {
if scheme.Action == jobmod.ActionNo {
state.Step = jobmod.StepCompleted
return nil
}
if scheme.Action == jobmod.ActionMove {
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, file.PackageID, job.ccInfo.CDSNodeID))
if err != nil {
return fmt.Errorf("starting cache move package: %w", err)
}
state.Step = jobmod.StepMoving
state.FullTaskID = fullTaskID
return nil
}
if scheme.Action == jobmod.ActionLoad {
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewStorageLoadPackage(0, file.PackageID, job.ccInfo.CDSStorageID))
if err != nil {
return fmt.Errorf("starting stroage load package: %w", err)
}
state.Step = jobmod.StepLoading
state.FullTaskID = fullTaskID
return nil
}
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
}
if state.Step == jobmod.StepMoving {
moveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if moveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", moveRet.Error)
}
state.Step = jobmod.StepCompleted
return nil
}
if state.Step == jobmod.StepLoading {
loadRet, err := event.AssertExecutorTaskStatus[*exectsk.StorageLoadPackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("storage load package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if loadRet.Error != "" {
return fmt.Errorf("storage load package: %s", loadRet.Error)
}
file.FullPath = loadRet.FullPath
state.Step = jobmod.StepCompleted
return nil
}
return nil
}
func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob, fileInfo schsdk.JobFileInfo, file *jobmod.ImageJobFile, scheme *jobmod.FileScheduleScheme, state *jobmod.FileSchedulingState) error {
if state.Step == jobmod.StepBegin {
state.Step = jobmod.StepUploaded
}
if state.Step == jobmod.StepUploaded {
if scheme.Action == jobmod.ActionNo {
state.Step = jobmod.StepCompleted
return nil
}
// 要导入镜像,则需要先将镜像移动到指点节点的缓存中
if scheme.Action == jobmod.ActionImportImage {
if file.PackageID == nil {
return fmt.Errorf("image %v has no associated package, which cannot be uploaded to %v", file.ImageID, job.ccInfo.CCID)
}
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewCacheMovePackage(0, *file.PackageID, job.ccInfo.CDSNodeID))
if err != nil {
return fmt.Errorf("starting cache move package: %w", err)
}
state.Step = jobmod.StepMoving
state.FullTaskID = fullTaskID
return nil
}
return fmt.Errorf("invalid schedule action %s for file info type %v", scheme.Action, reflect.TypeOf(fileInfo))
}
if state.Step == jobmod.StepMoving {
cacheMoveRet, err := event.AssertExecutorTaskStatus[*exectsk.CacheMovePackageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("cache move package timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if cacheMoveRet.Error != "" {
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
}
stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
if len(pkgObjs.Objects) != 1 {
return fmt.Errorf("there must be only 1 object in the package that will be imported")
}
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)))
if err != nil {
return fmt.Errorf("starting import image: %w", err)
}
state.Step = jobmod.StepImageImporting
state.FullTaskID = fullTaskID
return nil
}
if state.Step == jobmod.StepImageImporting {
uploadImageRet, err := event.AssertExecutorTaskStatus[*exectsk.UploadImageStatus](evt, state.FullTaskID)
if err == event.ErrUnconcernedTask {
return nil
}
if err == event.ErrTaskTimeout {
return fmt.Errorf("import image timeout")
}
h.mgr.execMgr.ForgetTask(state.FullTaskID)
if uploadImageRet.Error != "" {
return fmt.Errorf("import image: %s", uploadImageRet.Error)
}
// 调整过程中不会更换镜像所以ImageID不会发生变化
err = h.mgr.db.PCMImage().Create(h.mgr.db.SQLCtx(), file.ImageID, job.ccInfo.CCID, uploadImageRet.PCMImageID, uploadImageRet.Name, time.Now())
if err != nil {
return fmt.Errorf("creating pcm image info: %w", err)
}
state.Step = jobmod.StepCompleted
return nil
}
return nil
}
func (h *AdjustingHandler) OnEvent(broadcast event.Broadcast, evt event.Event) {
h.cmdChan.Send(func() {
if broadcast.ToAll() {
for _, job := range h.jobs {
h.onJobEvent(evt, job)
}
} else if broadcast.ToJobSet() {
for _, job := range h.jobs {
if job.job.GetJobSetID() != broadcast.JobSetID {
continue
}
h.onJobEvent(evt, job)
}
} else if broadcast.ToJob() {
if job, ok := h.jobs[broadcast.JobID]; ok {
h.onJobEvent(evt, job)
}
}
})
}
func (h *AdjustingHandler) Serve() {
cmdChan := h.cmdChan.BeginChanReceive()
defer h.cmdChan.CloseChanReceive()
for {
select {
case cmd := <-cmdChan:
cmd()
}
}
}
func (h *AdjustingHandler) Stop() {
// TODO 支持STOP
}