forked from JointCloud/JCC-CSScheduler
Merge branch 'feature_gxh'
This commit is contained in:
commit
738fc1267a
|
@ -15,7 +15,7 @@ import (
|
||||||
type Reporter struct {
|
type Reporter struct {
|
||||||
advisorID schmod.AdvisorID
|
advisorID schmod.AdvisorID
|
||||||
reportInterval time.Duration
|
reportInterval time.Duration
|
||||||
taskStatus map[string]advtsk.AdvTaskStatus
|
taskStatus map[string]advtsk.TaskStatus
|
||||||
taskStatusLock sync.Mutex
|
taskStatusLock sync.Mutex
|
||||||
reportNow chan bool
|
reportNow chan bool
|
||||||
}
|
}
|
||||||
|
@ -24,12 +24,12 @@ func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) *Repo
|
||||||
return &Reporter{
|
return &Reporter{
|
||||||
advisorID: advisorID,
|
advisorID: advisorID,
|
||||||
reportInterval: reportInterval,
|
reportInterval: reportInterval,
|
||||||
taskStatus: make(map[string]advtsk.AdvTaskStatus),
|
taskStatus: make(map[string]advtsk.TaskStatus),
|
||||||
reportNow: make(chan bool),
|
reportNow: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) Report(taskID string, taskStatus advtsk.AdvTaskStatus) {
|
func (r *Reporter) Report(taskID string, taskStatus advtsk.TaskStatus) {
|
||||||
r.taskStatusLock.Lock()
|
r.taskStatusLock.Lock()
|
||||||
defer r.taskStatusLock.Unlock()
|
defer r.taskStatusLock.Unlock()
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func (r *Reporter) Serve() error {
|
||||||
for taskID, status := range r.taskStatus {
|
for taskID, status := range r.taskStatus {
|
||||||
taskStatus = append(taskStatus, mgrmq.NewAdvisorTaskStatus(taskID, status))
|
taskStatus = append(taskStatus, mgrmq.NewAdvisorTaskStatus(taskID, status))
|
||||||
}
|
}
|
||||||
r.taskStatus = make(map[string]advtsk.AdvTaskStatus)
|
r.taskStatus = make(map[string]advtsk.TaskStatus)
|
||||||
r.taskStatusLock.Unlock()
|
r.taskStatusLock.Unlock()
|
||||||
|
|
||||||
_, err := magCli.ReportAdvisorTaskStatus(mgrmq.NewReportAdvisorTaskStatus(r.advisorID, taskStatus))
|
_, err := magCli.ReportAdvisorTaskStatus(mgrmq.NewReportAdvisorTaskStatus(r.advisorID, taskStatus))
|
||||||
|
|
|
@ -39,7 +39,7 @@ func NewManager(reporter *reporter.Reporter, scheduleSvc *scheduler.Service) Man
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) StartByInfo(info advtsk.AdvTaskInfo) (*Task, error) {
|
func (m *Manager) StartByInfo(info advtsk.TaskInfo) (*Task, error) {
|
||||||
infoType := myreflect.TypeOfValue(info)
|
infoType := myreflect.TypeOfValue(info)
|
||||||
|
|
||||||
ctor, ok := taskFromInfoCtors[infoType]
|
ctor, ok := taskFromInfoCtors[infoType]
|
||||||
|
@ -50,10 +50,10 @@ func (m *Manager) StartByInfo(info advtsk.AdvTaskInfo) (*Task, error) {
|
||||||
return m.StartNew(ctor(info)), nil
|
return m.StartNew(ctor(info)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskFromInfoCtors map[reflect.Type]func(advtsk.AdvTaskInfo) TaskBody = make(map[reflect.Type]func(advtsk.AdvTaskInfo) task.TaskBody[TaskContext])
|
var taskFromInfoCtors map[reflect.Type]func(advtsk.TaskInfo) TaskBody = make(map[reflect.Type]func(advtsk.TaskInfo) task.TaskBody[TaskContext])
|
||||||
|
|
||||||
func Register[TInfo advtsk.AdvTaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
|
func Register[TInfo advtsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
|
||||||
taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info advtsk.AdvTaskInfo) TaskBody {
|
taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info advtsk.TaskInfo) TaskBody {
|
||||||
return ctor(info.(TInfo))
|
return ctor(info.(TInfo))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,7 @@ import (
|
||||||
"gitlink.org.cn/cloudream/common/consts/errorcode"
|
"gitlink.org.cn/cloudream/common/consts/errorcode"
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
||||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||||
|
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobSetService struct {
|
type JobSetService struct {
|
||||||
|
@ -35,14 +36,14 @@ func (s *JobSetService) Submit(ctx *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
jobSetInfo, err := schsdk.JobSetInfoFromJSON(bodyData)
|
jobSetInfo, err := serder.JSONToObjectEx[schsdk.JobSetInfo](bodyData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("parsing request body: %s", err.Error())
|
log.Warnf("parsing request body: %s", err.Error())
|
||||||
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
|
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "parse request body failed"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
jobsetID, uploadScheme, err := s.svc.JobSetSvc().Submit(*jobSetInfo)
|
jobsetID, uploadScheme, err := s.svc.JobSetSvc().Submit(jobSetInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("submitting jobset: %s", err.Error())
|
log.Warnf("submitting jobset: %s", err.Error())
|
||||||
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "submit jobset failed"))
|
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "submit jobset failed"))
|
||||||
|
|
|
@ -2,9 +2,9 @@ package jobmod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/samber/lo"
|
"github.com/samber/lo"
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/types"
|
"gitlink.org.cn/cloudream/common/pkgs/types"
|
||||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||||
|
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FileScheduleAction string
|
type FileScheduleAction string
|
||||||
|
@ -75,7 +75,7 @@ var JobTypeUnion = types.NewTypeUnion[Job](
|
||||||
(*NormalJob)(nil),
|
(*NormalJob)(nil),
|
||||||
(*ResourceJob)(nil),
|
(*ResourceJob)(nil),
|
||||||
)
|
)
|
||||||
var _ = mq.RegisterUnionType(JobTypeUnion)
|
var _ = serder.UseTypeUnionExternallyTagged(&JobTypeUnion)
|
||||||
|
|
||||||
// TODO var _ = serder.RegisterNewTaggedTypeUnion(JobTypeUnion, "Type", "type")
|
// TODO var _ = serder.RegisterNewTaggedTypeUnion(JobTypeUnion, "Type", "type")
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
package jobmod
|
package jobmod
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/types"
|
"gitlink.org.cn/cloudream/common/pkgs/types"
|
||||||
|
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||||
)
|
)
|
||||||
|
|
||||||
type JobState interface {
|
type JobState interface {
|
||||||
|
@ -20,7 +20,7 @@ var JobStateTypeUnion = types.NewTypeUnion[JobState](
|
||||||
(*StateFailed)(nil),
|
(*StateFailed)(nil),
|
||||||
(*StateSuccess)(nil),
|
(*StateSuccess)(nil),
|
||||||
)
|
)
|
||||||
var _ = mq.RegisterUnionType(JobStateTypeUnion)
|
var _ = serder.UseTypeUnionExternallyTagged(&JobStateTypeUnion)
|
||||||
|
|
||||||
// TODO var _ = serder.RegisterNewTaggedTypeUnion(JobStateTypeUnion, "Type", "type")
|
// TODO var _ = serder.RegisterNewTaggedTypeUnion(JobStateTypeUnion, "Type", "type")
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ var _ = Register(Service.StartTask)
|
||||||
|
|
||||||
type StartTask struct {
|
type StartTask struct {
|
||||||
mq.MessageBodyBase
|
mq.MessageBodyBase
|
||||||
Info advtsk.AdvTaskInfo `json:"info"`
|
Info advtsk.TaskInfo `json:"info"`
|
||||||
}
|
}
|
||||||
type StartTaskResp struct {
|
type StartTaskResp struct {
|
||||||
mq.MessageBodyBase
|
mq.MessageBodyBase
|
||||||
|
@ -23,7 +23,7 @@ type StartTaskResp struct {
|
||||||
TaskID string `json:"taskID"`
|
TaskID string `json:"taskID"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStartTask(info advtsk.AdvTaskInfo) *StartTask {
|
func NewStartTask(info advtsk.TaskInfo) *StartTask {
|
||||||
return &StartTask{
|
return &StartTask{
|
||||||
Info: info,
|
Info: info,
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,8 +4,6 @@ import (
|
||||||
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Register[*MakeAdjustScheme, *MakeAdjustSchemeStatus]()
|
|
||||||
|
|
||||||
type MakeAdjustScheme struct {
|
type MakeAdjustScheme struct {
|
||||||
TaskInfoBase
|
TaskInfoBase
|
||||||
Job jobmod.NormalJob `json:"job"`
|
Job jobmod.NormalJob `json:"job"`
|
||||||
|
@ -29,3 +27,7 @@ func NewMakeAdjustSchemeStatus(err string, scheme jobmod.JobScheduleScheme) *Mak
|
||||||
Scheme: scheme,
|
Scheme: scheme,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Register[*MakeAdjustScheme, *MakeAdjustSchemeStatus]()
|
||||||
|
}
|
||||||
|
|
|
@ -1,48 +1,40 @@
|
||||||
package task
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/types"
|
"gitlink.org.cn/cloudream/common/pkgs/types"
|
||||||
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
|
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
|
||||||
|
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 任务。
|
// 任务。
|
||||||
// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和executor中的重名导致代码异常
|
type TaskInfo interface {
|
||||||
type AdvTaskInfo interface {
|
|
||||||
Noop()
|
Noop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 增加了新类型后需要在这里也同步添加
|
// 增加了新类型后需要在这里也同步添加
|
||||||
var TaskInfoTypeUnion = types.NewTypeUnion[AdvTaskInfo]()
|
var TaskInfoTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[TaskInfo]()))
|
||||||
|
|
||||||
type TaskInfoBase struct{}
|
type TaskInfoBase struct{}
|
||||||
|
|
||||||
func (s *TaskInfoBase) Noop() {}
|
func (s *TaskInfoBase) Noop() {}
|
||||||
|
|
||||||
// 任务上报的状态
|
// 任务上报的状态
|
||||||
// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和executor中的重名导致代码异常
|
type TaskStatus interface {
|
||||||
type AdvTaskStatus interface {
|
|
||||||
Noop()
|
Noop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 增加了新类型后需要在这里也同步添加
|
// 增加了新类型后需要在这里也同步添加
|
||||||
var TaskStatusTypeUnion = types.NewTypeUnion[AdvTaskStatus]()
|
var TaskStatusTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[TaskStatus]()))
|
||||||
|
|
||||||
type TaskStatusBase struct{}
|
type TaskStatusBase struct{}
|
||||||
|
|
||||||
func (s *TaskStatusBase) Noop() {}
|
func (s *TaskStatusBase) Noop() {}
|
||||||
|
|
||||||
// 注:此函数必须以var _ = Register[xxx, xxx]()的形式被调用,这样才能保证init中RegisterUnionType时
|
// 只能在init函数中调用,因为包级变量初始化会比init函数调用先进行
|
||||||
// TypeUnion不是空的。(因为包级变量初始化比init函数调用先进行)
|
func Register[TTaskInfo TaskInfo, TTaskStatus TaskStatus]() any {
|
||||||
func Register[TTaskInfo AdvTaskInfo, TTaskStatus AdvTaskStatus]() any {
|
|
||||||
TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]())
|
TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]())
|
||||||
|
|
||||||
TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]())
|
TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
mq.RegisterUnionType(TaskInfoTypeUnion)
|
|
||||||
mq.RegisterUnionType(TaskStatusTypeUnion)
|
|
||||||
}
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ var _ = Register(Service.StartTask)
|
||||||
|
|
||||||
type StartTask struct {
|
type StartTask struct {
|
||||||
mq.MessageBodyBase
|
mq.MessageBodyBase
|
||||||
Info exectsk.ExeTaskInfo `json:"info"`
|
Info exectsk.TaskInfo `json:"info"`
|
||||||
}
|
}
|
||||||
type StartTaskResp struct {
|
type StartTaskResp struct {
|
||||||
mq.MessageBodyBase
|
mq.MessageBodyBase
|
||||||
|
@ -24,7 +24,7 @@ type StartTaskResp struct {
|
||||||
TaskID string `json:"taskID"`
|
TaskID string `json:"taskID"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewStartTask(info exectsk.ExeTaskInfo) *StartTask {
|
func NewStartTask(info exectsk.TaskInfo) *StartTask {
|
||||||
return &StartTask{
|
return &StartTask{
|
||||||
Info: info,
|
Info: info,
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,8 +2,6 @@ package task
|
||||||
|
|
||||||
import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||||
|
|
||||||
var _ = Register[*CacheMovePackage, *CacheMovePackageStatus]()
|
|
||||||
|
|
||||||
type CacheMovePackage struct {
|
type CacheMovePackage struct {
|
||||||
TaskInfoBase
|
TaskInfoBase
|
||||||
UserID int64 `json:"userID"`
|
UserID int64 `json:"userID"`
|
||||||
|
@ -29,3 +27,7 @@ func NewCacheMovePackageStatus(err string, cacheInfos []cdssdk.ObjectCacheInfo)
|
||||||
CacheInfos: cacheInfos,
|
CacheInfos: cacheInfos,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Register[*CacheMovePackage, *CacheMovePackageStatus]()
|
||||||
|
}
|
||||||
|
|
|
@ -2,8 +2,6 @@ package task
|
||||||
|
|
||||||
import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
import cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
|
||||||
|
|
||||||
var _ = Register[*StorageCreatePackage, *StorageCreatePackageStatus]()
|
|
||||||
|
|
||||||
type StorageCreatePackage struct {
|
type StorageCreatePackage struct {
|
||||||
TaskInfoBase
|
TaskInfoBase
|
||||||
UserID int64 `json:"userID"`
|
UserID int64 `json:"userID"`
|
||||||
|
@ -37,3 +35,7 @@ func NewStorageCreatePackageStatus(status string, err string, packageID int64) *
|
||||||
PackageID: packageID,
|
PackageID: packageID,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Register[*StorageCreatePackage, *StorageCreatePackageStatus]()
|
||||||
|
}
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
package task
|
package task
|
||||||
|
|
||||||
var _ = Register[*StorageLoadPackage, *StorageLoadPackageStatus]()
|
|
||||||
|
|
||||||
type StorageLoadPackage struct {
|
type StorageLoadPackage struct {
|
||||||
TaskInfoBase
|
TaskInfoBase
|
||||||
UserID int64 `json:"userID"`
|
UserID int64 `json:"userID"`
|
||||||
|
@ -27,3 +25,7 @@ func NewStorageLoadPackageStatus(err string, fullPath string) *StorageLoadPackag
|
||||||
FullPath: fullPath,
|
FullPath: fullPath,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Register[*StorageLoadPackage, *StorageLoadPackageStatus]()
|
||||||
|
}
|
||||||
|
|
|
@ -5,8 +5,6 @@ import (
|
||||||
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
schsdk "gitlink.org.cn/cloudream/common/sdks/scheduler"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Register[*SubmitTask, *SubmitTaskStatus]()
|
|
||||||
|
|
||||||
type SubmitTask struct {
|
type SubmitTask struct {
|
||||||
TaskInfoBase
|
TaskInfoBase
|
||||||
PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"`
|
PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"`
|
||||||
|
@ -37,3 +35,7 @@ func NewSubmitTaskStatus(status pcmsdk.TaskStatus, err string) *SubmitTaskStatus
|
||||||
Error: err,
|
Error: err,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Register[*SubmitTask, *SubmitTaskStatus]()
|
||||||
|
}
|
||||||
|
|
|
@ -1,48 +1,40 @@
|
||||||
package task
|
package task
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/mq"
|
|
||||||
"gitlink.org.cn/cloudream/common/pkgs/types"
|
"gitlink.org.cn/cloudream/common/pkgs/types"
|
||||||
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
|
myreflect "gitlink.org.cn/cloudream/common/utils/reflect"
|
||||||
|
"gitlink.org.cn/cloudream/common/utils/serder"
|
||||||
)
|
)
|
||||||
|
|
||||||
// 任务
|
// 任务
|
||||||
// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和advisor中的重名导致代码异常
|
type TaskInfo interface {
|
||||||
type ExeTaskInfo interface {
|
|
||||||
Noop()
|
Noop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 增加了新类型后需要在这里也同步添加
|
// 增加了新类型后需要在这里也同步添加
|
||||||
var TaskInfoTypeUnion = types.NewTypeUnion[ExeTaskInfo]()
|
var TaskInfoTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[TaskInfo]()))
|
||||||
|
|
||||||
type TaskInfoBase struct{}
|
type TaskInfoBase struct{}
|
||||||
|
|
||||||
func (s *TaskInfoBase) Noop() {}
|
func (s *TaskInfoBase) Noop() {}
|
||||||
|
|
||||||
// 任务上报的状态
|
// 任务上报的状态
|
||||||
// 由于json-iter库的缺陷,这个类型名必须加一点前缀,否则会和advisor中的重名导致代码异常
|
type TaskStatus interface {
|
||||||
type ExeTaskStatus interface {
|
|
||||||
Noop()
|
Noop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// 增加了新类型后需要在这里也同步添加
|
// 增加了新类型后需要在这里也同步添加
|
||||||
var TaskStatusTypeUnion = types.NewTypeUnion[ExeTaskStatus]()
|
var TaskStatusTypeUnion = serder.UseTypeUnionExternallyTagged(types.Ref(types.NewTypeUnion[TaskStatus]()))
|
||||||
|
|
||||||
type TaskStatusBase struct{}
|
type TaskStatusBase struct{}
|
||||||
|
|
||||||
func (s *TaskStatusBase) Noop() {}
|
func (s *TaskStatusBase) Noop() {}
|
||||||
|
|
||||||
// 注:此函数必须以var _ = Register[xxx, xxx]()的形式被调用,这样才能保证init中RegisterUnionType时
|
// 只能在init函数中调用,因为包级变量初始化会比init函数调用先进行
|
||||||
// TypeUnion不是空的。(因为包级变量初始化比init函数调用先进行)
|
func Register[TTaskInfo TaskInfo, TTaskStatus TaskStatus]() any {
|
||||||
func Register[TTaskInfo ExeTaskInfo, TTaskStatus ExeTaskStatus]() any {
|
|
||||||
TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]())
|
TaskInfoTypeUnion.Add(myreflect.TypeOf[TTaskInfo]())
|
||||||
|
|
||||||
TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]())
|
TaskStatusTypeUnion.Add(myreflect.TypeOf[TTaskStatus]())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
|
||||||
mq.RegisterUnionType(TaskInfoTypeUnion)
|
|
||||||
mq.RegisterUnionType(TaskStatusTypeUnion)
|
|
||||||
}
|
|
||||||
|
|
|
@ -4,8 +4,6 @@ import (
|
||||||
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
pcmsdk "gitlink.org.cn/cloudream/common/sdks/pcm"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = Register[*UploadImage, *UploadImageStatus]()
|
|
||||||
|
|
||||||
type UploadImage struct {
|
type UploadImage struct {
|
||||||
TaskInfoBase
|
TaskInfoBase
|
||||||
PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"`
|
PCMParticipantID pcmsdk.ParticipantID `json:"pcmParticipantID"`
|
||||||
|
@ -33,3 +31,7 @@ func NewUploadImageStatus(status string, err string, pcmImageID pcmsdk.ImageID,
|
||||||
Name: name,
|
Name: name,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
Register[*UploadImage, *UploadImageStatus]()
|
||||||
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ type ReportAdvisorTaskStatusResp struct {
|
||||||
}
|
}
|
||||||
type AdvisorTaskStatus struct {
|
type AdvisorTaskStatus struct {
|
||||||
TaskID string
|
TaskID string
|
||||||
Status advtsk.AdvTaskStatus
|
Status advtsk.TaskStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReportAdvisorTaskStatus(advisorID schmod.AdvisorID, taskStatus []AdvisorTaskStatus) *ReportAdvisorTaskStatus {
|
func NewReportAdvisorTaskStatus(advisorID schmod.AdvisorID, taskStatus []AdvisorTaskStatus) *ReportAdvisorTaskStatus {
|
||||||
|
@ -37,7 +37,7 @@ func NewReportAdvisorTaskStatus(advisorID schmod.AdvisorID, taskStatus []Advisor
|
||||||
func NewReportAdvisorTaskStatusResp() *ReportAdvisorTaskStatusResp {
|
func NewReportAdvisorTaskStatusResp() *ReportAdvisorTaskStatusResp {
|
||||||
return &ReportAdvisorTaskStatusResp{}
|
return &ReportAdvisorTaskStatusResp{}
|
||||||
}
|
}
|
||||||
func NewAdvisorTaskStatus(taskID string, status exectsk.ExeTaskStatus) AdvisorTaskStatus {
|
func NewAdvisorTaskStatus(taskID string, status exectsk.TaskStatus) AdvisorTaskStatus {
|
||||||
return AdvisorTaskStatus{
|
return AdvisorTaskStatus{
|
||||||
TaskID: taskID,
|
TaskID: taskID,
|
||||||
Status: status,
|
Status: status,
|
||||||
|
|
|
@ -24,7 +24,7 @@ type ReportExecutorTaskStatusResp struct {
|
||||||
}
|
}
|
||||||
type ExecutorTaskStatus struct {
|
type ExecutorTaskStatus struct {
|
||||||
TaskID string
|
TaskID string
|
||||||
Status exectsk.ExeTaskStatus
|
Status exectsk.TaskStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus {
|
func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []ExecutorTaskStatus) *ReportExecutorTaskStatus {
|
||||||
|
@ -36,7 +36,7 @@ func NewReportExecutorTaskStatus(executorID schmod.ExecutorID, taskStatus []Exec
|
||||||
func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp {
|
func NewReportExecutorTaskStatusResp() *ReportExecutorTaskStatusResp {
|
||||||
return &ReportExecutorTaskStatusResp{}
|
return &ReportExecutorTaskStatusResp{}
|
||||||
}
|
}
|
||||||
func NewExecutorTaskStatus(taskID string, status exectsk.ExeTaskStatus) ExecutorTaskStatus {
|
func NewExecutorTaskStatus(taskID string, status exectsk.TaskStatus) ExecutorTaskStatus {
|
||||||
return ExecutorTaskStatus{
|
return ExecutorTaskStatus{
|
||||||
TaskID: taskID,
|
TaskID: taskID,
|
||||||
Status: status,
|
Status: status,
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
type Reporter struct {
|
type Reporter struct {
|
||||||
executorID schmod.ExecutorID
|
executorID schmod.ExecutorID
|
||||||
reportInterval time.Duration
|
reportInterval time.Duration
|
||||||
taskStatus map[string]exectsk.ExeTaskStatus
|
taskStatus map[string]exectsk.TaskStatus
|
||||||
taskStatusLock sync.Mutex
|
taskStatusLock sync.Mutex
|
||||||
reportNow chan bool
|
reportNow chan bool
|
||||||
}
|
}
|
||||||
|
@ -24,12 +24,12 @@ func NewReporter(executorID schmod.ExecutorID, reportInterval time.Duration) Rep
|
||||||
return Reporter{
|
return Reporter{
|
||||||
executorID: executorID,
|
executorID: executorID,
|
||||||
reportInterval: reportInterval,
|
reportInterval: reportInterval,
|
||||||
taskStatus: make(map[string]exectsk.ExeTaskStatus),
|
taskStatus: make(map[string]exectsk.TaskStatus),
|
||||||
reportNow: make(chan bool),
|
reportNow: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reporter) Report(taskID string, taskStatus exectsk.ExeTaskStatus) {
|
func (r *Reporter) Report(taskID string, taskStatus exectsk.TaskStatus) {
|
||||||
r.taskStatusLock.Lock()
|
r.taskStatusLock.Lock()
|
||||||
defer r.taskStatusLock.Unlock()
|
defer r.taskStatusLock.Unlock()
|
||||||
|
|
||||||
|
@ -65,7 +65,7 @@ func (r *Reporter) Serve() error {
|
||||||
for taskID, status := range r.taskStatus {
|
for taskID, status := range r.taskStatus {
|
||||||
taskStatus = append(taskStatus, mgrmq.NewExecutorTaskStatus(taskID, status))
|
taskStatus = append(taskStatus, mgrmq.NewExecutorTaskStatus(taskID, status))
|
||||||
}
|
}
|
||||||
r.taskStatus = make(map[string]exectsk.ExeTaskStatus)
|
r.taskStatus = make(map[string]exectsk.TaskStatus)
|
||||||
r.taskStatusLock.Unlock()
|
r.taskStatusLock.Unlock()
|
||||||
|
|
||||||
_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus))
|
_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus))
|
||||||
|
|
|
@ -36,7 +36,7 @@ func NewManager(reporter *reporter.Reporter) Manager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) StartByInfo(info exectsk.ExeTaskInfo) (*Task, error) {
|
func (m *Manager) StartByInfo(info exectsk.TaskInfo) (*Task, error) {
|
||||||
infoType := myreflect.TypeOfValue(info)
|
infoType := myreflect.TypeOfValue(info)
|
||||||
|
|
||||||
ctor, ok := taskFromInfoCtors[infoType]
|
ctor, ok := taskFromInfoCtors[infoType]
|
||||||
|
@ -47,10 +47,10 @@ func (m *Manager) StartByInfo(info exectsk.ExeTaskInfo) (*Task, error) {
|
||||||
return m.StartNew(ctor(info)), nil
|
return m.StartNew(ctor(info)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskFromInfoCtors map[reflect.Type]func(exectsk.ExeTaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.ExeTaskInfo) task.TaskBody[TaskContext])
|
var taskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.TaskInfo) task.TaskBody[TaskContext])
|
||||||
|
|
||||||
func Register[TInfo exectsk.ExeTaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
|
func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
|
||||||
taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info exectsk.ExeTaskInfo) TaskBody {
|
taskFromInfoCtors[myreflect.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody {
|
||||||
return ctor(info.(TInfo))
|
return ctor(info.(TInfo))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ type AdvisorInfo struct {
|
||||||
lastReportTime time.Time
|
lastReportTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.AdvTaskStatus)
|
type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.TaskStatus)
|
||||||
type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string)
|
type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string)
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
|
@ -86,7 +86,7 @@ func (m *Manager) Report(advID schmod.AdvisorID, taskStatus []mgrmq.AdvisorTaskS
|
||||||
}
|
}
|
||||||
|
|
||||||
// 启动一个Task,并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID
|
// 启动一个Task,并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID
|
||||||
func (m *Manager) StartTask(jobID schsdk.JobID, info advtsk.AdvTaskInfo) (string, error) {
|
func (m *Manager) StartTask(jobID schsdk.JobID, info advtsk.TaskInfo) (string, error) {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ type ExecutorInfo struct {
|
||||||
lastReportTime time.Time
|
lastReportTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus exetsk.ExeTaskStatus)
|
type OnTaskUpdatedCallbackFn func(jobID schsdk.JobID, fullTaskID string, taskStatus exetsk.TaskStatus)
|
||||||
type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string)
|
type OnTimeoutCallbackFn func(jobID schsdk.JobID, fullTaskID string)
|
||||||
|
|
||||||
type Manager struct {
|
type Manager struct {
|
||||||
|
@ -87,7 +87,7 @@ func (m *Manager) Report(execID schmod.ExecutorID, taskStatus []mgrmq.ExecutorTa
|
||||||
}
|
}
|
||||||
|
|
||||||
// 启动一个Task,并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID
|
// 启动一个Task,并将其关联到指定的Job。返回一个在各Executor之间唯一的TaskID
|
||||||
func (m *Manager) StartTask(jobID schsdk.JobID, info exetsk.ExeTaskInfo) (string, error) {
|
func (m *Manager) StartTask(jobID schsdk.JobID, info exetsk.TaskInfo) (string, error) {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer m.lock.Unlock()
|
defer m.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -5,17 +5,17 @@ import advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
|
||||||
// advisor上报任务进度
|
// advisor上报任务进度
|
||||||
type AdvisorTaskUpdated struct {
|
type AdvisorTaskUpdated struct {
|
||||||
FullTaskID string
|
FullTaskID string
|
||||||
TaskStatus advtsk.AdvTaskStatus
|
TaskStatus advtsk.TaskStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewAdvisorTaskUpdated(fullTaskID string, taskStatus advtsk.AdvTaskStatus) *AdvisorTaskUpdated {
|
func NewAdvisorTaskUpdated(fullTaskID string, taskStatus advtsk.TaskStatus) *AdvisorTaskUpdated {
|
||||||
return &AdvisorTaskUpdated{
|
return &AdvisorTaskUpdated{
|
||||||
FullTaskID: fullTaskID,
|
FullTaskID: fullTaskID,
|
||||||
TaskStatus: taskStatus,
|
TaskStatus: taskStatus,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AssertAdvisorTaskStatus[T advtsk.AdvTaskStatus](evt Event, fullTaskID string) (T, error) {
|
func AssertAdvisorTaskStatus[T advtsk.TaskStatus](evt Event, fullTaskID string) (T, error) {
|
||||||
var ret T
|
var ret T
|
||||||
if evt == nil {
|
if evt == nil {
|
||||||
return ret, ErrUnconcernedTask
|
return ret, ErrUnconcernedTask
|
||||||
|
|
|
@ -7,17 +7,17 @@ import (
|
||||||
// executor上报任务进度
|
// executor上报任务进度
|
||||||
type ExecutorTaskUpdated struct {
|
type ExecutorTaskUpdated struct {
|
||||||
FullTaskID string
|
FullTaskID string
|
||||||
TaskStatus exectsk.ExeTaskStatus
|
TaskStatus exectsk.TaskStatus
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewExecutorTaskUpdated(fullTaskID string, taskStatus exectsk.ExeTaskStatus) *ExecutorTaskUpdated {
|
func NewExecutorTaskUpdated(fullTaskID string, taskStatus exectsk.TaskStatus) *ExecutorTaskUpdated {
|
||||||
return &ExecutorTaskUpdated{
|
return &ExecutorTaskUpdated{
|
||||||
FullTaskID: fullTaskID,
|
FullTaskID: fullTaskID,
|
||||||
TaskStatus: taskStatus,
|
TaskStatus: taskStatus,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func AssertExecutorTaskStatus[T exectsk.ExeTaskStatus](evt Event, fullTaskID string) (T, error) {
|
func AssertExecutorTaskStatus[T exectsk.TaskStatus](evt Event, fullTaskID string) (T, error) {
|
||||||
var ret T
|
var ret T
|
||||||
if evt == nil {
|
if evt == nil {
|
||||||
return ret, ErrUnconcernedTask
|
return ret, ErrUnconcernedTask
|
||||||
|
|
|
@ -181,7 +181,7 @@ func (m *Manager) LocalFileUploaded(jobSetID schsdk.JobSetID, localPath string,
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) executorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus exectsk.ExeTaskStatus) {
|
func (m *Manager) executorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus exectsk.TaskStatus) {
|
||||||
m.pubLock.Lock()
|
m.pubLock.Lock()
|
||||||
defer m.pubLock.Unlock()
|
defer m.pubLock.Unlock()
|
||||||
|
|
||||||
|
@ -205,7 +205,7 @@ func (m *Manager) executorTaskTimeout(jobID schsdk.JobID, fullTaskID string) {
|
||||||
job.Handler.OnEvent(event.ToJob(jobID), event.NewExecutorTaskTimeout(fullTaskID))
|
job.Handler.OnEvent(event.ToJob(jobID), event.NewExecutorTaskTimeout(fullTaskID))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) advisorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.AdvTaskStatus) {
|
func (m *Manager) advisorTaskUpdated(jobID schsdk.JobID, fullTaskID string, taskStatus advtsk.TaskStatus) {
|
||||||
m.pubLock.Lock()
|
m.pubLock.Lock()
|
||||||
defer m.pubLock.Unlock()
|
defer m.pubLock.Unlock()
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue