diff --git a/advisor/internal/scheduler/scheduler.go b/advisor/internal/scheduler/scheduler.go index 7c92ec4..866300e 100644 --- a/advisor/internal/scheduler/scheduler.go +++ b/advisor/internal/scheduler/scheduler.go @@ -8,6 +8,7 @@ import ( "github.com/samber/lo" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" "gitlink.org.cn/cloudream/common/utils/math" @@ -362,7 +363,7 @@ func (s *DefaultScheduler) calcResourceLevel(avai float64, need float64) int { // 计算节点得分情况 func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsdk.CCID]*candidate) error { // 只计算运控返回的计算中心上的存储服务的数据权重 - cdsNodeToCC := make(map[int64]*candidate) + cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) for _, cc := range allCCs { cdsNodeToCC[cc.CC.CDSNodeID] = cc } @@ -407,7 +408,7 @@ func (s *DefaultScheduler) calcFileScore(files jobmod.JobFiles, allCCs map[schsd } // 计算package在各节点的得分情况 -func (s *DefaultScheduler) calcPackageFileScore(packageID int64, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -458,7 +459,7 @@ func (s *DefaultScheduler) calcPackageFileScore(packageID int64, cdsNodeToCC map } // 计算package在各节点的得分情况 -func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) diff --git a/client/internal/http/jobset.go b/client/internal/http/jobset.go index 65f5c88..59d8cec 100644 --- a/client/internal/http/jobset.go +++ b/client/internal/http/jobset.go @@ -8,6 +8,7 @@ import ( "gitlink.org.cn/cloudream/common/consts/errorcode" "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" "gitlink.org.cn/cloudream/common/utils/serder" ) @@ -57,10 +58,10 @@ func (s *JobSetService) Submit(ctx *gin.Context) { } type JobSetLocalFileUploadedReq struct { - JobSetID schsdk.JobSetID `json:"jobSetID" binding:"required"` - LocalPath string `json:"localPath" binding:"required"` - Error string `json:"error"` - PackageID int64 `json:"packageID"` + JobSetID schsdk.JobSetID `json:"jobSetID" binding:"required"` + LocalPath string `json:"localPath" binding:"required"` + Error string `json:"error"` + PackageID cdssdk.PackageID `json:"packageID"` } func (s *JobSetService) LocalFileUploaded(ctx *gin.Context) { diff --git a/client/internal/prescheduler/default_prescheduler.go b/client/internal/prescheduler/default_prescheduler.go index 7d30516..96ec9da 100644 --- a/client/internal/prescheduler/default_prescheduler.go +++ b/client/internal/prescheduler/default_prescheduler.go @@ -8,6 +8,7 @@ import ( "github.com/samber/lo" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" "gitlink.org.cn/cloudream/common/utils/math" @@ -534,7 +535,7 @@ func (s *DefaultPreScheduler) calcResourceLevel(avai float64, need float64) int // 计算节点得分情况 func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs map[schsdk.CCID]*candidate) error { // 只计算运控返回的可用计算中心上的存储服务的数据权重 - cdsNodeToCC := make(map[int64]*candidate) + cdsNodeToCC := make(map[cdssdk.NodeID]*candidate) for _, cc := range allCCs { cdsNodeToCC[cc.CC.CDSNodeID] = cc } @@ -586,7 +587,7 @@ func (s *DefaultPreScheduler) calcFileScore(files schsdk.JobFilesInfo, allCCs ma } // 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultPreScheduler) calcPackageFileScore(packageID cdssdk.PackageID, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) @@ -639,7 +640,7 @@ func (s *DefaultPreScheduler) calcPackageFileScore(packageID int64, cdsNodeToCC } // 计算package在各节点的得分情况 -func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[int64]*candidate) (map[schsdk.CCID]*fileDetail, error) { +func (s *DefaultPreScheduler) calcImageFileScore(imageID schsdk.ImageID, allCCs map[schsdk.CCID]*candidate, cdsNodeToCC map[cdssdk.NodeID]*candidate) (map[schsdk.CCID]*fileDetail, error) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { return nil, fmt.Errorf("new collector client: %w", err) diff --git a/client/internal/services/jobset.go b/client/internal/services/jobset.go index dc1a532..9cbe6ad 100644 --- a/client/internal/services/jobset.go +++ b/client/internal/services/jobset.go @@ -4,6 +4,7 @@ import ( "fmt" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager" ) @@ -38,7 +39,7 @@ func (svc *JobSetService) Submit(info schsdk.JobSetInfo) (schsdk.JobSetID, *schs } // 任务集中某个文件上传完成 -func (svc *JobSetService) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, errMsg string, packageID int64) error { +func (svc *JobSetService) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, errMsg string, packageID cdssdk.PackageID) error { mgrCli, err := schglb.ManagerMQPool.Acquire() if err != nil { return fmt.Errorf("new manager client: %w", err) diff --git a/collector/internal/mq/storage.go b/collector/internal/mq/storage.go index ac2cd4f..b402da9 100644 --- a/collector/internal/mq/storage.go +++ b/collector/internal/mq/storage.go @@ -26,7 +26,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed") } - return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType)) + return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize)) } func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) { diff --git a/common/models/job/normal_job.go b/common/models/job/normal_job.go index 10766dc..bb1dea1 100644 --- a/common/models/job/normal_job.go +++ b/common/models/job/normal_job.go @@ -2,6 +2,7 @@ package jobmod import ( schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) type NormalJob struct { @@ -35,11 +36,11 @@ type JobFiles struct { } type PackageJobFile struct { - PackageID int64 `json:"packageID"` - FullPath string `json:"fullPath"` // Load之后的完整文件路径 + PackageID cdssdk.PackageID `json:"packageID"` + FullPath string `json:"fullPath"` // Load之后的完整文件路径 } type ImageJobFile struct { - PackageID *int64 `json:"packageID"` - ImageID schsdk.ImageID `json:"imageID"` + PackageID *cdssdk.PackageID `json:"packageID"` + ImageID schsdk.ImageID `json:"imageID"` } diff --git a/common/models/job/resource_job.go b/common/models/job/resource_job.go index 224c0b9..8cf02b5 100644 --- a/common/models/job/resource_job.go +++ b/common/models/job/resource_job.go @@ -1,11 +1,14 @@ package jobmod -import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) type ResourceJob struct { JobBase Info schsdk.ResourceJobInfo `json:"info"` - ResourcePackageID int64 `json:"resourcePackageID"` // 回源之后得到的PackageID + ResourcePackageID cdssdk.PackageID `json:"resourcePackageID"` // 回源之后得到的PackageID } func NewResourceJob(jobSetID schsdk.JobSetID, jobID schsdk.JobID, info schsdk.ResourceJobInfo) *ResourceJob { diff --git a/common/models/models.go b/common/models/models.go index f0146dc..14b8435 100644 --- a/common/models/models.go +++ b/common/models/models.go @@ -6,6 +6,7 @@ import ( pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uopsdk "gitlink.org.cn/cloudream/common/sdks/unifyops" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" "gitlink.org.cn/cloudream/common/utils/serder" @@ -23,9 +24,9 @@ type ComputingCenter struct { // 计算中心在PCM系统的ID PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID" db:"PCMParticipantID"` // 计算中心在存储系统的ID - CDSNodeID int64 `json:"cdsNodeID" db:"CDSNodeID"` + CDSNodeID cdssdk.NodeID `json:"cdsNodeID" db:"CDSNodeID"` // 此算力中心的存储服务对应在存储系统中的ID - CDSStorageID int64 `json:"cdsStorageID" db:"CDSStorageID"` + CDSStorageID cdssdk.StorageID `json:"cdsStorageID" db:"CDSStorageID"` // 计算中心名称 Name string `json:"name" db:"Name"` } @@ -34,7 +35,7 @@ type Image struct { // 调度系统内的镜像ID ImageID schsdk.ImageID `json:"imageID" db:"ImageID"` // 镜像文件对应的存储系统PackageID,可以为空,为空则代表此镜像不可被自动导入到算力中心,比如是预制镜像 - CDSPackageID *int64 `json:"cdsPackageID" db:"CDSPackageID"` + CDSPackageID *cdssdk.PackageID `json:"cdsPackageID" db:"CDSPackageID"` // 镜像名称,在调度系统上设置的 Name string `json:"name" db:"Name"` // 镜像创建时间 diff --git a/common/pkgs/db/image.go b/common/pkgs/db/image.go index 53e57b0..8f7f6da 100644 --- a/common/pkgs/db/image.go +++ b/common/pkgs/db/image.go @@ -5,6 +5,7 @@ import ( "github.com/jmoiron/sqlx" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" schmod "gitlink.org.cn/cloudream/scheduler/common/models" ) @@ -22,7 +23,7 @@ func (*ImageDB) GetByID(ctx SQLContext, id schsdk.ImageID) (schmod.Image, error) return ret, err } -func (*ImageDB) Create(ctx SQLContext, cdsPackageID *int64, name string, createTime time.Time) (schsdk.ImageID, error) { +func (*ImageDB) Create(ctx SQLContext, cdsPackageID *cdssdk.PackageID, name string, createTime time.Time) (schsdk.ImageID, error) { ret, err := ctx.Exec("insert into Image(CDSPackageID, Name, CreateTime) values(?, ?, ?)", cdsPackageID, name, createTime) if err != nil { return 0, err diff --git a/common/pkgs/mq/collector/storage.go b/common/pkgs/mq/collector/storage.go index f8b181b..fd17b79 100644 --- a/common/pkgs/mq/collector/storage.go +++ b/common/pkgs/mq/collector/storage.go @@ -16,26 +16,25 @@ var _ = Register(Service.PackageGetCachedStgNodes) type PackageGetCachedStgNodes struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type PackageGetCachedStgNodesResp struct { mq.MessageBodyBase cdssdk.PackageCachingInfo } -func NewPackageGetCachedStgNodes(userID int64, packageID int64) *PackageGetCachedStgNodes { +func NewPackageGetCachedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetCachedStgNodes { return &PackageGetCachedStgNodes{ UserID: userID, PackageID: packageID, } } -func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *PackageGetCachedStgNodesResp { +func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp { return &PackageGetCachedStgNodesResp{ PackageCachingInfo: cdssdk.PackageCachingInfo{ - NodeInfos: nodeInfos, - PackageSize: packageSize, - RedunancyType: redunancyType, + NodeInfos: nodeInfos, + PackageSize: packageSize, }, } } @@ -48,21 +47,21 @@ var _ = Register(Service.PackageGetLoadedStgNodes) type PackageGetLoadedStgNodes struct { mq.MessageBodyBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` } type PackageGetLoadedStgNodesResp struct { mq.MessageBodyBase - StgNodeIDs []int64 `json:"stgNodeIDs"` + StgNodeIDs []cdssdk.NodeID `json:"stgNodeIDs"` } -func NewPackageGetLoadedStgNodes(userID int64, packageID int64) *PackageGetLoadedStgNodes { +func NewPackageGetLoadedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageID) *PackageGetLoadedStgNodes { return &PackageGetLoadedStgNodes{ UserID: userID, PackageID: packageID, } } -func NewPackageGetLoadedStgNodesResp(nodeIDs []int64) *PackageGetLoadedStgNodesResp { +func NewPackageGetLoadedStgNodesResp(nodeIDs []cdssdk.NodeID) *PackageGetLoadedStgNodesResp { return &PackageGetLoadedStgNodesResp{ StgNodeIDs: nodeIDs, } diff --git a/common/pkgs/mq/executor/task/cache_move_package.go b/common/pkgs/mq/executor/task/cache_move_package.go index 79b4117..5414bf3 100644 --- a/common/pkgs/mq/executor/task/cache_move_package.go +++ b/common/pkgs/mq/executor/task/cache_move_package.go @@ -4,27 +4,25 @@ import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" type CacheMovePackage struct { TaskInfoBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StgNodeID int64 `json:"stgNodeID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + StgNodeID cdssdk.NodeID `json:"stgNodeID"` } type CacheMovePackageStatus struct { TaskStatusBase - Error string `json:"error"` - CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"` + Error string `json:"error"` } -func NewCacheMovePackage(userID int64, packageID int64, stgNodeID int64) *CacheMovePackage { +func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage { return &CacheMovePackage{ UserID: userID, PackageID: packageID, StgNodeID: stgNodeID, } } -func NewCacheMovePackageStatus(err string, cacheInfos []cdssdk.ObjectCacheInfo) *CacheMovePackageStatus { +func NewCacheMovePackageStatus(err string) *CacheMovePackageStatus { return &CacheMovePackageStatus{ - Error: err, - CacheInfos: cacheInfos, + Error: err, } } diff --git a/common/pkgs/mq/executor/task/storage_create_package.go b/common/pkgs/mq/executor/task/storage_create_package.go index f88ad24..79e23b9 100644 --- a/common/pkgs/mq/executor/task/storage_create_package.go +++ b/common/pkgs/mq/executor/task/storage_create_package.go @@ -4,31 +4,29 @@ import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" type StorageCreatePackage struct { TaskInfoBase - UserID int64 `json:"userID"` - StorageID int64 `json:"storageID"` - Path string `json:"path"` - BucketID int64 `json:"bucketID"` - Name string `json:"name"` - Redundancy cdssdk.TypedRedundancyInfo `json:"redundancy"` + UserID cdssdk.UserID `json:"userID"` + StorageID cdssdk.StorageID `json:"storageID"` + Path string `json:"path"` + BucketID cdssdk.BucketID `json:"bucketID"` + Name string `json:"name"` } type StorageCreatePackageStatus struct { TaskStatusBase - Status string `json:"status"` - Error string `json:"error"` - PackageID int64 `json:"packageID"` + Status string `json:"status"` + Error string `json:"error"` + PackageID cdssdk.PackageID `json:"packageID"` } -func NewStorageCreatePackage(userID int64, storageID int64, filePath string, bucketID int64, name string, redundancy cdssdk.TypedRedundancyInfo) *StorageCreatePackage { +func NewStorageCreatePackage(userID cdssdk.UserID, storageID cdssdk.StorageID, filePath string, bucketID cdssdk.BucketID, name string) *StorageCreatePackage { return &StorageCreatePackage{ - UserID: userID, - StorageID: storageID, - Path: filePath, - BucketID: bucketID, - Name: name, - Redundancy: redundancy, + UserID: userID, + StorageID: storageID, + Path: filePath, + BucketID: bucketID, + Name: name, } } -func NewStorageCreatePackageStatus(status string, err string, packageID int64) *StorageCreatePackageStatus { +func NewStorageCreatePackageStatus(status string, err string, packageID cdssdk.PackageID) *StorageCreatePackageStatus { return &StorageCreatePackageStatus{ Status: status, Error: err, diff --git a/common/pkgs/mq/executor/task/storage_load_package.go b/common/pkgs/mq/executor/task/storage_load_package.go index 9056895..4b62405 100644 --- a/common/pkgs/mq/executor/task/storage_load_package.go +++ b/common/pkgs/mq/executor/task/storage_load_package.go @@ -1,10 +1,12 @@ package task +import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + type StorageLoadPackage struct { TaskInfoBase - UserID int64 `json:"userID"` - PackageID int64 `json:"packageID"` - StorageID int64 `json:"storageID"` + UserID cdssdk.UserID `json:"userID"` + PackageID cdssdk.PackageID `json:"packageID"` + StorageID cdssdk.StorageID `json:"storageID"` } type StorageLoadPackageStatus struct { TaskStatusBase @@ -12,7 +14,7 @@ type StorageLoadPackageStatus struct { FullPath string `json:"fullPath"` } -func NewStorageLoadPackage(userID int64, packageID int64, storageID int64) *StorageLoadPackage { +func NewStorageLoadPackage(userID cdssdk.UserID, packageID cdssdk.PackageID, storageID cdssdk.StorageID) *StorageLoadPackage { return &StorageLoadPackage{ UserID: userID, PackageID: packageID, diff --git a/common/pkgs/mq/manager/job.go b/common/pkgs/mq/manager/job.go index ca933df..0fed86d 100644 --- a/common/pkgs/mq/manager/job.go +++ b/common/pkgs/mq/manager/job.go @@ -3,6 +3,7 @@ package manager import ( "gitlink.org.cn/cloudream/common/pkgs/mq" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" ) @@ -49,16 +50,16 @@ var _ = Register(Service.JobSetLocalFileUploaded) type JobSetLocalFileUploaded struct { mq.MessageBodyBase - JobSetID schsdk.JobSetID `json:"jobSetID"` - LocalPath string `json:"localPath"` - Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因 - PackageID int64 `json:"packageID"` // 如果上传文件成功,那么这个字段是上传之后得到的PackageID + JobSetID schsdk.JobSetID `json:"jobSetID"` + LocalPath string `json:"localPath"` + Error string `json:"error"` // 如果上传文件失败,那么这个字段说明了失败原因 + PackageID cdssdk.PackageID `json:"packageID"` // 如果上传文件成功,那么这个字段是上传之后得到的PackageID } type JobSetLocalFileUploadedResp struct { mq.MessageBodyBase } -func NewJobSetLocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, err string, packageID int64) *JobSetLocalFileUploaded { +func NewJobSetLocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, err string, packageID cdssdk.PackageID) *JobSetLocalFileUploaded { return &JobSetLocalFileUploaded{ JobSetID: jobSetID, LocalPath: localPath, diff --git a/common/utils/utils.go b/common/utils/utils.go index 23c0d2c..b302c83 100644 --- a/common/utils/utils.go +++ b/common/utils/utils.go @@ -7,10 +7,11 @@ import ( "time" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" ) -func MakeJobOutputFullPath(stgDir string, userID int64, jobID schsdk.JobID) string { - return filepath.Join(stgDir, strconv.FormatInt(userID, 10), "jobs", string(jobID), "output") +func MakeJobOutputFullPath(stgDir string, userID cdssdk.UserID, jobID schsdk.JobID) string { + return filepath.Join(stgDir, strconv.FormatInt(int64(userID), 10), "jobs", string(jobID), "output") } func MakeResourcePackageName(jobID schsdk.JobID) string { diff --git a/executor/internal/task/cache_move_package.go b/executor/internal/task/cache_move_package.go index 132c396..b7d4c70 100644 --- a/executor/internal/task/cache_move_package.go +++ b/executor/internal/task/cache_move_package.go @@ -26,11 +26,11 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext log.Debugf("begin with %v", logger.FormatStruct(t.CacheMovePackage)) defer log.Debugf("end") - cacheInfos, err := t.do(ctx) + err := t.do(ctx) if err != nil { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error(), cacheInfos)) + ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error())) } else { - ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("", cacheInfos)) + ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("")) } ctx.reporter.ReportNow() @@ -39,23 +39,23 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext }) } -func (t *CacheMovePackage) do(ctx TaskContext) ([]cdssdk.ObjectCacheInfo, error) { +func (t *CacheMovePackage) do(ctx TaskContext) error { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { - return nil, fmt.Errorf("new cloudream storage client: %w", err) + return fmt.Errorf("new cloudream storage client: %w", err) } defer schglb.CloudreamStoragePool.Release(stgCli) - resp, err := stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ + _, err = stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ UserID: t.UserID, PackageID: t.PackageID, NodeID: t.StgNodeID, }) if err != nil { - return nil, err + return err } - return resp.CacheInfos, err + return err } func init() { diff --git a/executor/internal/task/storage_create_package.go b/executor/internal/task/storage_create_package.go index acf8552..9281f0b 100644 --- a/executor/internal/task/storage_create_package.go +++ b/executor/internal/task/storage_create_package.go @@ -46,12 +46,11 @@ func (t *StorageCreatePackage) do(taskID string, ctx TaskContext) error { defer schglb.CloudreamStoragePool.Release(stgCli) resp, err := stgCli.StorageCreatePackage(cdssdk.StorageCreatePackageReq{ - UserID: t.UserID, - StorageID: t.StorageID, - Path: t.Path, - BucketID: t.BucketID, - Name: t.Name, - Redundancy: t.Redundancy, + UserID: t.UserID, + StorageID: t.StorageID, + Path: t.Path, + BucketID: t.BucketID, + Name: t.Name, }) if err != nil { return err diff --git a/manager/internal/jobmgr/adjusting_handler.go b/manager/internal/jobmgr/adjusting_handler.go index b5b587e..4e64b4f 100644 --- a/manager/internal/jobmgr/adjusting_handler.go +++ b/manager/internal/jobmgr/adjusting_handler.go @@ -55,7 +55,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { colCli, err := schglb.CollectorMQPool.Acquire() if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err.Error()), job.GetState())) return } defer schglb.CollectorMQPool.Release(colCli) @@ -68,7 +68,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { stgCli, err := schglb.CloudreamStoragePool.Acquire() if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err.Error()), job.GetState())) return } defer schglb.CloudreamStoragePool.Release(stgCli) @@ -77,7 +77,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) { StorageID: ccInfo.CDSStorageID, }) if err != nil { - h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err), job.GetState())) + h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err.Error()), job.GetState())) return } @@ -277,11 +277,22 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob, return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) } - if len(cacheMoveRet.CacheInfos) != 1 { + stgCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new cloudream storage client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(stgCli) + + pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID}) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + if len(pkgObjs.Objects) != 1 { return fmt.Errorf("there must be only 1 object in the package that will be imported") } - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash))) if err != nil { return fmt.Errorf("starting import image: %w", err) } diff --git a/manager/internal/jobmgr/event/local_file_uploaded.go b/manager/internal/jobmgr/event/local_file_uploaded.go index c5f0de2..4c261ea 100644 --- a/manager/internal/jobmgr/event/local_file_uploaded.go +++ b/manager/internal/jobmgr/event/local_file_uploaded.go @@ -1,16 +1,19 @@ package event -import schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" +import ( + schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" +) // 本地文件上传结束 type LocalFileUploaded struct { JobSetID schsdk.JobSetID LocalPath string Error string - PackageID int64 + PackageID cdssdk.PackageID } -func NewLocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, err string, packageID int64) *LocalFileUploaded { +func NewLocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, err string, packageID cdssdk.PackageID) *LocalFileUploaded { return &LocalFileUploaded{ JobSetID: jobSetID, LocalPath: localPath, diff --git a/manager/internal/jobmgr/executing_handler.go b/manager/internal/jobmgr/executing_handler.go index 142ba98..32d9312 100644 --- a/manager/internal/jobmgr/executing_handler.go +++ b/manager/internal/jobmgr/executing_handler.go @@ -186,7 +186,6 @@ func (h *ExecutingHandler) onResourceJobEvent(evt event.Event, job *executingJob tarNorJob.OutputFullPath, resJob.Info.BucketID, utils.MakeResourcePackageName(resJob.JobID), - resJob.Info.Redundancy, )) if err != nil { h.changeJobState(job.job, jobmod.NewStateFailed(err.Error(), job.state)) diff --git a/manager/internal/jobmgr/jobmgr.go b/manager/internal/jobmgr/jobmgr.go index c6cfa40..5393159 100644 --- a/manager/internal/jobmgr/jobmgr.go +++ b/manager/internal/jobmgr/jobmgr.go @@ -9,6 +9,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/logger" schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler" + cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" myreflect "gitlink.org.cn/cloudream/common/utils/reflect" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" @@ -170,7 +171,7 @@ func (m *Manager) SubmitJobSet(jobSetInfo schsdk.JobSetInfo, preScheduleScheme j return jobSet, nil } -func (m *Manager) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, err string, packageID int64) error { +func (m *Manager) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath string, err string, packageID cdssdk.PackageID) error { m.pubLock.Lock() defer m.pubLock.Unlock() diff --git a/manager/internal/jobmgr/prescheduling_handler.go b/manager/internal/jobmgr/prescheduling_handler.go index ff01050..db8f590 100644 --- a/manager/internal/jobmgr/prescheduling_handler.go +++ b/manager/internal/jobmgr/prescheduling_handler.go @@ -345,15 +345,26 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) } - if len(cacheMoveRet.CacheInfos) == 0 { + stgCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new cloudream storage client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(stgCli) + + pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID}) + if err != nil { + return fmt.Errorf("getting package objects: %w", err) + } + + if len(pkgObjs.Objects) == 0 { return fmt.Errorf("no object in the package which will be imported") } - if len(cacheMoveRet.CacheInfos) > 1 { + if len(pkgObjs.Objects) > 1 { return fmt.Errorf("there must be only 1 object in the package which will be imported") } - fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) + fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash))) if err != nil { return fmt.Errorf("starting import image: %w", err) }