forked from JointCloud/JCC-CSScheduler
86 lines
1.5 KiB
Go
86 lines
1.5 KiB
Go
package scheduler
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/samber/lo"
|
|
|
|
"gitlink.org.cn/cloudream/common/pkgs/future"
|
|
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
|
|
)
|
|
|
|
type schedulingJob struct {
|
|
Job jobmod.NormalJob
|
|
Callback *future.SetValueFuture[*jobmod.JobScheduleScheme]
|
|
}
|
|
|
|
type Service struct {
|
|
scheduler Scheduler
|
|
jobs []*schedulingJob
|
|
lock sync.Mutex
|
|
hasNewJob chan bool
|
|
}
|
|
|
|
func NewService(scheduler Scheduler) *Service {
|
|
return &Service{
|
|
scheduler: scheduler,
|
|
hasNewJob: make(chan bool),
|
|
}
|
|
}
|
|
|
|
func (s *Service) MakeScheme(job jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
|
|
s.lock.Lock()
|
|
callback := future.NewSetValue[*jobmod.JobScheduleScheme]()
|
|
s.jobs = append(s.jobs, &schedulingJob{
|
|
Job: job,
|
|
Callback: callback,
|
|
})
|
|
s.lock.Unlock()
|
|
|
|
select {
|
|
case s.hasNewJob <- true:
|
|
default:
|
|
}
|
|
|
|
return callback.WaitValue(context.Background())
|
|
}
|
|
|
|
func (s *Service) Serve() error {
|
|
ticker := time.NewTicker(time.Second * 5)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
s.tryMakeScheme()
|
|
case <-s.hasNewJob:
|
|
s.tryMakeScheme()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Service) tryMakeScheme() {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
for i, job := range s.jobs {
|
|
scheme, err := s.scheduler.Schedule(&job.Job)
|
|
if err == nil {
|
|
job.Callback.SetValue(scheme)
|
|
s.jobs[i] = nil
|
|
continue
|
|
}
|
|
|
|
if err == ErrNoAvailableScheme {
|
|
continue
|
|
}
|
|
|
|
job.Callback.SetError(err)
|
|
s.jobs[i] = nil
|
|
}
|
|
|
|
s.jobs = lo.Reject(s.jobs, func(item *schedulingJob, idx int) bool { return item == nil })
|
|
}
|