JCC-CSScheduler/advisor/internal/scheduler/service.go

86 lines
1.5 KiB
Go

package scheduler
import (
"context"
"sync"
"time"
"github.com/samber/lo"
"gitlink.org.cn/cloudream/common/pkgs/future"
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
)
type schedulingJob struct {
Job jobmod.NormalJob
Callback *future.SetValueFuture[*jobmod.JobScheduleScheme]
}
type Service struct {
scheduler Scheduler
jobs []*schedulingJob
lock sync.Mutex
hasNewJob chan bool
}
func NewService(scheduler Scheduler) *Service {
return &Service{
scheduler: scheduler,
hasNewJob: make(chan bool),
}
}
func (s *Service) MakeScheme(job jobmod.NormalJob) (*jobmod.JobScheduleScheme, error) {
s.lock.Lock()
callback := future.NewSetValue[*jobmod.JobScheduleScheme]()
s.jobs = append(s.jobs, &schedulingJob{
Job: job,
Callback: callback,
})
s.lock.Unlock()
select {
case s.hasNewJob <- true:
default:
}
return callback.WaitValue(context.Background())
}
func (s *Service) Serve() error {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.tryMakeScheme()
case <-s.hasNewJob:
s.tryMakeScheme()
}
}
}
func (s *Service) tryMakeScheme() {
s.lock.Lock()
defer s.lock.Unlock()
for i, job := range s.jobs {
scheme, err := s.scheduler.Schedule(&job.Job)
if err == nil {
job.Callback.SetValue(scheme)
s.jobs[i] = nil
continue
}
if err == ErrNoAvailableScheme {
continue
}
job.Callback.SetError(err)
s.jobs[i] = nil
}
s.jobs = lo.Reject(s.jobs, func(item *schedulingJob, idx int) bool { return item == nil })
}