From 3edb51128060111843d927adfc49aa7cba37cca8 Mon Sep 17 00:00:00 2001 From: Sydonian <794346190@qq.com> Date: Wed, 6 Dec 2023 09:46:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- collector/internal/mq/storage.go | 2 +- common/pkgs/mq/collector/storage.go | 7 +++---- .../mq/executor/task/cache_move_package.go | 8 +++---- executor/internal/task/cache_move_package.go | 16 +++++++------- manager/internal/jobmgr/adjusting_handler.go | 21 ++++++++++++++----- .../internal/jobmgr/prescheduling_handler.go | 17 ++++++++++++--- 6 files changed, 45 insertions(+), 26 deletions(-) diff --git a/collector/internal/mq/storage.go b/collector/internal/mq/storage.go index ac2cd4f..b402da9 100644 --- a/collector/internal/mq/storage.go +++ b/collector/internal/mq/storage.go @@ -26,7 +26,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed") } - return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType)) + return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize)) } func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) { diff --git a/common/pkgs/mq/collector/storage.go b/common/pkgs/mq/collector/storage.go index 1092c68..fd17b79 100644 --- a/common/pkgs/mq/collector/storage.go +++ b/common/pkgs/mq/collector/storage.go @@ -30,12 +30,11 @@ func NewPackageGetCachedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageI PackageID: packageID, } } -func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *PackageGetCachedStgNodesResp { +func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp { return &PackageGetCachedStgNodesResp{ PackageCachingInfo: cdssdk.PackageCachingInfo{ - NodeInfos: nodeInfos, - PackageSize: packageSize, - RedunancyType: redunancyType, + NodeInfos: nodeInfos, + PackageSize: packageSize, }, } } diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go index bed4909..5414bf3 100644 --- a/common/pkgs/mq/executor/task/cache_move_package.go +++ b/common/pkgs/mq/executor/task/cache_move_package.go @@ -10,8 +10,7 @@ type CacheMovePackage struct { } type CacheMovePackageStatus struct { TaskStatusBase - Error string `json:"error"` - CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"` + Error string `json:"error"` } func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage { @@ -21,10 +20,9 @@ func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNo StgNodeID: stgNodeID, } } -func NewCacheMovePackageStatus(err string, cacheInfos []cdssdk.ObjectCacheInfo) *CacheMovePackageStatus { +func NewCacheMovePackageStatus(err string) *CacheMovePackageStatus { return &CacheMovePackageStatus{ - Error: err, - CacheInfos: cacheInfos, + Error: err, } } diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index 132c396..b7d4c70 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -26,11 +26,11 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext log.Debugf("begin with %v", logger.FormatStruct(t.CacheMovePackage)) defer log.Debugf("end") - cacheInfos, err := t.do(ctx) + err := t.do(ctx) if err != nil { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error(), cacheInfos)) + ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error())) } else { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("", cacheInfos)) + ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("")) } ctx.reporter.ReportNow() @@ -39,23 +39,23 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext }) } -func (t *CacheMovePackage) do(ctx TaskContext) ([]cdssdk.ObjectCacheInfo, error) { +func (t *CacheMovePackage) do(ctx TaskContext) error { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { - return nil, fmt.Errorf("new cloudream storage client: %w", err) + return fmt.Errorf("new cloudream storage client: %w", err) } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ + _, err = stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ UserID: t.UserID, PackageID: t.PackageID, NodeID: t.StgNodeID, }) if err != nil { - return nil, err + return err } - return resp.CacheInfos, err + return err } func init() { diff --git a/manager/internal/jobmgr/adjusting_handler.go b/manager/internal/jobmgr/adjusting_handler.go index b5b587e..4e64b4f 100644 --- a/manager/internal/jobmgr/adjusting_handler.go +++ b/manager/internal/jobmgr/adjusting_handler.go @@ -55,7 +55,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err.Error()), job.GetState())) return } defer schglb.CollectorMQPool.Release(colCli) @@ -68,7 +68,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err.Error()), job.GetState())) return } defer schglb.CloudreamStoragePool.Release(stgCli) @@ -77,7 +77,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { StorageID: ccInfo.CDSStorageID, }) if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err.Error()), job.GetState())) return } @@ -277,11 +277,22 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob, return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) } - if len(cacheMoveRet.CacheInfos) != 1 { + stgCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new cloudream storage client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(stgCli) + + pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID}) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + if len(pkgObjs.Objects) != 1 { return fmt.Errorf("there must be only 1 object in the package that will be imported") } - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash))) if err != nil { return fmt.Errorf("starting import image: %w", err) } diff --git a/manager/internal/jobmgr/prescheduling_handler.go b/manager/internal/jobmgr/prescheduling_handler.go index ff01050..db8f590 100644 --- a/manager/internal/jobmgr/prescheduling_handler.go +++ b/manager/internal/jobmgr/prescheduling_handler.go @@ -345,15 +345,26 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) } - if len(cacheMoveRet.CacheInfos) == 0 { + stgCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new cloudream storage client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(stgCli) + + pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID}) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + if len(pkgObjs.Objects) == 0 { return fmt.Errorf("no object in the package which will be imported") } - if len(cacheMoveRet.CacheInfos) > 1 { + if len(pkgObjs.Objects) > 1 { return fmt.Errorf("there must be only 1 object in the package which will be imported") } - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash))) if err != nil { return fmt.Errorf("starting import image: %w", err) }