优化接口

This commit is contained in:
Sydonian 2023-12-06 09:46:48 +08:00
parent 2496392e03
commit 3edb511280
6 changed files with 45 additions and 26 deletions

View File

@ -26,7 +26,7 @@ func (svc *Service) PackageGetCachedStgNodes(msg *colmq.PackageGetCachedStgNodes
return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed") return nil, mq.Failed(errorcode.OperationFailed, "get package cached stg nodes failed")
} }
return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize, resp.RedunancyType)) return mq.ReplyOK(colmq.NewPackageGetCachedStgNodesResp(resp.NodeInfos, resp.PackageSize))
} }
func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) { func (svc *Service) PackageGetLoadedStgNodes(msg *colmq.PackageGetLoadedStgNodes) (*colmq.PackageGetLoadedStgNodesResp, *mq.CodeMessage) {

View File

@ -30,12 +30,11 @@ func NewPackageGetCachedStgNodes(userID cdssdk.UserID, packageID cdssdk.PackageI
PackageID: packageID, PackageID: packageID,
} }
} }
func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64, redunancyType string) *PackageGetCachedStgNodesResp { func NewPackageGetCachedStgNodesResp(nodeInfos []cdssdk.NodePackageCachingInfo, packageSize int64) *PackageGetCachedStgNodesResp {
return &PackageGetCachedStgNodesResp{ return &PackageGetCachedStgNodesResp{
PackageCachingInfo: cdssdk.PackageCachingInfo{ PackageCachingInfo: cdssdk.PackageCachingInfo{
NodeInfos: nodeInfos, NodeInfos: nodeInfos,
PackageSize: packageSize, PackageSize: packageSize,
RedunancyType: redunancyType,
}, },
} }
} }

View File

@ -11,7 +11,6 @@ type CacheMovePackage struct {
type CacheMovePackageStatus struct { type CacheMovePackageStatus struct {
TaskStatusBase TaskStatusBase
Error string `json:"error"` Error string `json:"error"`
CacheInfos []cdssdk.ObjectCacheInfo `json:"cacheInfos"`
} }
func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage { func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNodeID cdssdk.NodeID) *CacheMovePackage {
@ -21,10 +20,9 @@ func NewCacheMovePackage(userID cdssdk.UserID, packageID cdssdk.PackageID, stgNo
StgNodeID: stgNodeID, StgNodeID: stgNodeID,
} }
} }
func NewCacheMovePackageStatus(err string, cacheInfos []cdssdk.ObjectCacheInfo) *CacheMovePackageStatus { func NewCacheMovePackageStatus(err string) *CacheMovePackageStatus {
return &CacheMovePackageStatus{ return &CacheMovePackageStatus{
Error: err, Error: err,
CacheInfos: cacheInfos,
} }
} }

View File

@ -26,11 +26,11 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext
log.Debugf("begin with %v", logger.FormatStruct(t.CacheMovePackage)) log.Debugf("begin with %v", logger.FormatStruct(t.CacheMovePackage))
defer log.Debugf("end") defer log.Debugf("end")
cacheInfos, err := t.do(ctx) err := t.do(ctx)
if err != nil { if err != nil {
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error(), cacheInfos)) ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(err.Error()))
} else { } else {
ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus("", cacheInfos)) ctx.reporter.Report(task.ID(), exectsk.NewCacheMovePackageStatus(""))
} }
ctx.reporter.ReportNow() ctx.reporter.ReportNow()
@ -39,23 +39,23 @@ func (t *CacheMovePackage) Execute(task *task.Task[TaskContext], ctx TaskContext
}) })
} }
func (t *CacheMovePackage) do(ctx TaskContext) ([]cdssdk.ObjectCacheInfo, error) { func (t *CacheMovePackage) do(ctx TaskContext) error {
stgCli, err := schglb.CloudreamStoragePool.Acquire() stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil { if err != nil {
return nil, fmt.Errorf("new cloudream storage client: %w", err) return fmt.Errorf("new cloudream storage client: %w", err)
} }
defer schglb.CloudreamStoragePool.Release(stgCli) defer schglb.CloudreamStoragePool.Release(stgCli)
resp, err := stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{ _, err = stgCli.CacheMovePackage(cdssdk.CacheMovePackageReq{
UserID: t.UserID, UserID: t.UserID,
PackageID: t.PackageID, PackageID: t.PackageID,
NodeID: t.StgNodeID, NodeID: t.StgNodeID,
}) })
if err != nil { if err != nil {
return nil, err return err
} }
return resp.CacheInfos, err return err
} }
func init() { func init() {

View File

@ -55,7 +55,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) {
colCli, err := schglb.CollectorMQPool.Acquire() colCli, err := schglb.CollectorMQPool.Acquire()
if err != nil { if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err), job.GetState())) h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new collector client: %s", err.Error()), job.GetState()))
return return
} }
defer schglb.CollectorMQPool.Release(colCli) defer schglb.CollectorMQPool.Release(colCli)
@ -68,7 +68,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) {
stgCli, err := schglb.CloudreamStoragePool.Acquire() stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil { if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err), job.GetState())) h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("new cloudream storage client: %s", err.Error()), job.GetState()))
return return
} }
defer schglb.CloudreamStoragePool.Release(stgCli) defer schglb.CloudreamStoragePool.Release(stgCli)
@ -77,7 +77,7 @@ func (h *AdjustingHandler) Handle(job jobmod.Job) {
StorageID: ccInfo.CDSStorageID, StorageID: ccInfo.CDSStorageID,
}) })
if err != nil { if err != nil {
h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err), job.GetState())) h.changeJobState(job, jobmod.NewStateFailed(fmt.Sprintf("getting cloudream storage info: %s", err.Error()), job.GetState()))
return return
} }
@ -277,11 +277,22 @@ func (h *AdjustingHandler) doImageScheduling(evt event.Event, job *adjustingJob,
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
} }
if len(cacheMoveRet.CacheInfos) != 1 { stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
if len(pkgObjs.Objects) != 1 {
return fmt.Errorf("there must be only 1 object in the package that will be imported") return fmt.Errorf("there must be only 1 object in the package that will be imported")
} }
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)))
if err != nil { if err != nil {
return fmt.Errorf("starting import image: %w", err) return fmt.Errorf("starting import image: %w", err)
} }

View File

@ -345,15 +345,26 @@ func (h *PreSchedulingHandler) doImageScheduling(evt event.Event, job *preSchedu
return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error) return fmt.Errorf("cache move pacakge: %s", cacheMoveRet.Error)
} }
if len(cacheMoveRet.CacheInfos) == 0 { stgCli, err := schglb.CloudreamStoragePool.Acquire()
if err != nil {
return fmt.Errorf("new cloudream storage client: %w", err)
}
defer schglb.CloudreamStoragePool.Release(stgCli)
pkgObjs, err := stgCli.ObjectGetPackageObjects(cdssdk.ObjectGetPackageObjectsReq{UserID: 0, PackageID: *file.PackageID})
if err != nil {
return fmt.Errorf("getting package objects: %w", err)
}
if len(pkgObjs.Objects) == 0 {
return fmt.Errorf("no object in the package which will be imported") return fmt.Errorf("no object in the package which will be imported")
} }
if len(cacheMoveRet.CacheInfos) > 1 { if len(pkgObjs.Objects) > 1 {
return fmt.Errorf("there must be only 1 object in the package which will be imported") return fmt.Errorf("there must be only 1 object in the package which will be imported")
} }
fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(cacheMoveRet.CacheInfos[0].FileHash))) fullTaskID, err := h.mgr.execMgr.StartTask(job.job.JobID, exectsk.NewUploadImage(job.ccInfo.PCMParticipantID, cdssdk.MakeIPFSFilePath(pkgObjs.Objects[0].FileHash)))
if err != nil { if err != nil {
return fmt.Errorf("starting import image: %w", err) return fmt.Errorf("starting import image: %w", err)
} }