86 lines
2.1 KiB
Go
86 lines
2.1 KiB
Go
package reporter
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitlink.org.cn/cloudream/common/pkgs/logger"
|
|
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
|
|
schmod "gitlink.org.cn/cloudream/scheduler/common/models"
|
|
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
|
|
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
|
|
)
|
|
|
|
type Reporter struct {
|
|
advisorID schmod.AdvisorID
|
|
reportInterval time.Duration
|
|
taskStatus map[string]advtsk.TaskStatus
|
|
taskStatusLock sync.Mutex
|
|
reportNow chan bool
|
|
}
|
|
|
|
func NewReporter(advisorID schmod.AdvisorID, reportInterval time.Duration) *Reporter {
|
|
return &Reporter{
|
|
advisorID: advisorID,
|
|
reportInterval: reportInterval,
|
|
taskStatus: make(map[string]advtsk.TaskStatus),
|
|
reportNow: make(chan bool),
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Report(taskID string, taskStatus advtsk.TaskStatus) {
|
|
r.taskStatusLock.Lock()
|
|
defer r.taskStatusLock.Unlock()
|
|
|
|
r.taskStatus[taskID] = taskStatus
|
|
}
|
|
|
|
func (r *Reporter) ReportNow() {
|
|
select {
|
|
case r.reportNow <- true:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (r *Reporter) Serve() error {
|
|
magCli, err := schglb.ManagerMQPool.Acquire()
|
|
if err != nil {
|
|
return fmt.Errorf("new manager client: %w", err)
|
|
}
|
|
defer schglb.ManagerMQPool.Release(magCli)
|
|
|
|
ticker := time.NewTicker(r.reportInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
case <-r.reportNow:
|
|
ticker.Reset(r.reportInterval)
|
|
}
|
|
|
|
r.taskStatusLock.Lock()
|
|
var taskStatus []mgrmq.AdvisorTaskStatus
|
|
for taskID, status := range r.taskStatus {
|
|
taskStatus = append(taskStatus, mgrmq.NewAdvisorTaskStatus(taskID, status))
|
|
}
|
|
r.taskStatus = make(map[string]advtsk.TaskStatus)
|
|
r.taskStatusLock.Unlock()
|
|
|
|
_, err := magCli.ReportAdvisorTaskStatus(mgrmq.NewReportAdvisorTaskStatus(r.advisorID, taskStatus))
|
|
if err != nil {
|
|
logger.Warnf("reporting to manager: %s", err.Error())
|
|
|
|
//若上报失败,数据应保留
|
|
r.taskStatusLock.Lock()
|
|
for _, ts := range taskStatus {
|
|
if _, exists := r.taskStatus[ts.TaskID]; !exists {
|
|
r.taskStatus[ts.TaskID] = ts.Status
|
|
}
|
|
}
|
|
r.taskStatusLock.Unlock()
|
|
}
|
|
}
|
|
}
|