JCC-CSScheduler/advisor/internal/reporter/reporter.go

86 lines
2.1 KiB
Go

package reporter
import (
"fmt"
"sync"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
type Reporter struct {
advisorID schmod.AdvisorID
reportInterval time.Duration
taskStatus map[string]advtsk.TaskStatus
taskStatusLock sync.Mutex
reportNow chan bool
}
func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) *Reporter {
return &Reporter{
advisorID: advisorID,
reportInterval: reportInterval,
taskStatus: make(map[string]advtsk.TaskStatus),
reportNow: make(chan bool),
}
}
func (r *Reporter) Report(taskID string, taskStatus advtsk.TaskStatus) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
r.taskStatus[taskID] = taskStatus
}
func (r *Reporter) ReportNow() {
select {
case r.reportNow <- true:
default:
}
}
func (r *Reporter) Serve() error {
magCli, err := schglb.ManagerMQPool.Acquire()
if err != nil {
return fmt.Errorf("new manager client: %w", err)
}
defer schglb.ManagerMQPool.Release(magCli)
ticker := time.NewTicker(r.reportInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-r.reportNow:
ticker.Reset(r.reportInterval)
}
r.taskStatusLock.Lock()
var taskStatus []mgrmq.AdvisorTaskStatus
for taskID, status := range r.taskStatus {
taskStatus = append(taskStatus, mgrmq.NewAdvisorTaskStatus(taskID, status))
}
r.taskStatus = make(map[string]advtsk.TaskStatus)
r.taskStatusLock.Unlock()
_, err := magCli.ReportAdvisorTaskStatus(mgrmq.NewReportAdvisorTaskStatus(r.advisorID, taskStatus))
if err != nil {
logger.Warnf("reporting to manager: %s", err.Error())
//若上报失败,数据应保留
r.taskStatusLock.Lock()
for _, ts := range taskStatus {
if _, exists := r.taskStatus[ts.TaskID]; !exists {
r.taskStatus[ts.TaskID] = ts.Status
}
}
r.taskStatusLock.Unlock()
}
}
}