advisor init

This commit is contained in:
songjc 2023-09-06 15:51:11 +08:00
parent efcf4c2c1f
commit 602c8f611f
18 changed files with 609 additions and 12 deletions

2
advisor/README.md Normal file
View File

@ -0,0 +1,2 @@
# scheduler-advisor

View File

@ -0,0 +1,23 @@
package config
import (
log "gitlink.org.cn/cloudream/common/pkgs/logger"
c "gitlink.org.cn/cloudream/common/utils/config"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
type Config struct {
Logger log.Config `json:"logger"`
ReportIntervalSec int `json:"reportIntervalSec"`
RabbitMQ mymq.Config `json:"rabbitMQ"`
}
var cfg Config
func Init() error {
return c.DefaultLoad("advisor", &cfg)
}
func Cfg() *Config {
return &cfg
}

View File

@ -0,0 +1,9 @@
package globals
import "github.com/google/uuid"
var AdvisorID string
func Init() {
AdvisorID = uuid.NewString()
}

View File

@ -0,0 +1,84 @@
package reporter
import (
"fmt"
"sync"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/common/globals"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
mgrmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/manager"
)
type Reporter struct {
advisorID string
reportInterval time.Duration
taskStatus map[string]advtsk.TaskStatus
taskStatusLock sync.Mutex
reportNow chan bool
}
func NewReporter(advisorID string, reportInterval time.Duration) Reporter {
return Reporter{
advisorID: advisorID,
reportInterval: reportInterval,
taskStatus: make(map[string]advtsk.TaskStatus),
reportNow: make(chan bool),
}
}
func (r *Reporter) Report(taskID string, taskStatus advtsk.TaskStatus) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
r.taskStatus[taskID] = taskStatus
}
func (r *Reporter) ReportNow() {
select {
case r.reportNow <- true:
default:
}
}
func (r *Reporter) Serve() error {
magCli, err := globals.ManagerMQPool.Acquire()
if err != nil {
return fmt.Errorf("new manager client: %w", err)
}
defer magCli.Close()
ticker := time.NewTicker(r.reportInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
case <-r.reportNow:
ticker.Reset(r.reportInterval)
}
r.taskStatusLock.Lock()
var taskStatus []mgrmq.AdvisorTaskStatus
for taskID, status := range r.taskStatus {
taskStatus = append(taskStatus, mgrmq.NewAdvisorTaskStatus(taskID, status))
}
r.taskStatus = make(map[string]advtsk.TaskStatus)
r.taskStatusLock.Unlock()
_, err := magCli.ReportAdvisorTaskStatus(mgrmq.NewReportAdvisorTaskStatus(r.advisorID, taskStatus))
if err != nil {
logger.Warnf("reporting to manager: %s", err.Error())
//若上报失败,数据应保留
r.taskStatusLock.Lock()
for _, ts := range taskStatus {
if _, exists := r.taskStatus[ts.TaskID]; !exists {
r.taskStatus[ts.TaskID] = ts.Status
}
}
r.taskStatusLock.Unlock()
}
}
}

View File

@ -0,0 +1,12 @@
package services
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
schtsk "gitlink.org.cn/cloudream/scheduler/advisor/internal/task"
advmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor"
)
func (svc *Service) StartGetScheduleScheme(msg *advmq.StartGetScheduleScheme) (*advmq.StartGetScheduleSchemeResp, *mq.CodeMessage) {
tsk := svc.taskManager.StartNew(schtsk.NewGetScheduleScheme())
return mq.ReplyOK(advmq.NewStartGetScheduleSchemeResp(tsk.ID()))
}

View File

@ -0,0 +1,15 @@
package services
import (
"gitlink.org.cn/cloudream/scheduler/advisor/internal/task"
)
type Service struct {
taskManager *task.Manager
}
func NewService(taskMgr *task.Manager) *Service {
return &Service{
taskManager: taskMgr,
}
}

View File

@ -0,0 +1,73 @@
package task
import (
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/common/pkgs/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
type GetScheduleScheme struct {
}
func NewGetScheduleScheme() *GetScheduleScheme {
return &GetScheduleScheme{}
}
func (t *GetScheduleScheme) Execute(task *task.Task[TaskContext], ctx TaskContext, complete CompleteFn) {
log := logger.WithType[GetScheduleScheme]("Task")
log.Debugf("begin")
defer log.Debugf("end")
err := t.do(task.ID(), ctx)
if err != nil {
//TODO 若任务失败上报的状态failed字段根据情况修改
ctx.reporter.Report(task.ID(), exectsk.NewScheduleTaskStatus("failed", err.Error(), 0))
}
ctx.reporter.ReportNow()
complete(err, CompleteOption{
RemovingDelay: time.Minute,
})
}
func (t *GetScheduleScheme) do(taskID string, ctx TaskContext) error {
// pcmCli, err := globals.PCMPool.Acquire()
// if err != nil {
// return fmt.Errorf("new pcm client: %w", err)
// }
// defer pcmCli.Close()
// resp, err := pcmCli.ScheduleTask(pcm.ScheduleTaskReq{
// })
// if err != nil {
// return err
// }
// var prevStatus string
// for {
// tsResp, err := pcmCli.GetTaskStatus(pcm.GetTaskStatusReq{
// NodeID: t.nodeID,
// PCMJobID: resp.PCMJobID,
// })
// if err != nil {
// return err
// }
// if tsResp.Status != prevStatus {
// ctx.reporter.Report(taskID, exectsk.NewScheduleTaskStatus(tsResp.Status, "", resp.PCMJobID))
// }
// prevStatus = tsResp.Status
// // TODO 根据接口result返回情况修改
// // 根据返回的result判定任务是否完成若完成 跳出循环,结束任务
// if tsResp.Status == "Completed" {
// return nil
// }
// }
return nil
}

View File

@ -0,0 +1,28 @@
package task
import (
"gitlink.org.cn/cloudream/common/pkgs/task"
reporter "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter"
)
type TaskContext struct {
reporter *reporter.Reporter
}
// 需要在Task结束后主动调用completing函数将在Manager加锁期间被调用
// 因此适合进行执行结果的设置
type CompleteFn = task.CompleteFn
type Manager = task.Manager[TaskContext]
type TaskBody = task.TaskBody[TaskContext]
type Task = task.Task[TaskContext]
type CompleteOption = task.CompleteOption
func NewManager(reporter *reporter.Reporter) Manager {
return task.NewManager(TaskContext{
reporter: reporter,
})
}

74
advisor/main.go Normal file
View File

@ -0,0 +1,74 @@
package main
import (
"fmt"
"os"
"time"
"gitlink.org.cn/cloudream/common/pkgs/logger"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/config"
myglbs "gitlink.org.cn/cloudream/scheduler/advisor/internal/globals"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/services"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/task"
execmq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor"
)
func main() {
err := config.Init()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}
myglbs.Init()
rpter := reporter.NewReporter(myglbs.AdvisorID, time.Second*time.Duration(config.Cfg().ReportIntervalSec))
taskMgr := task.NewManager(&rpter)
mqSvr, err := execmq.NewServer(services.NewService(&taskMgr), &config.Cfg().RabbitMQ)
if err != nil {
logger.Fatalf("new advisor server failed, err: %s", err.Error())
}
mqSvr.OnError = func(err error) {
logger.Warnf("advisor server err: %s", err.Error())
}
// 启动服务
go serveMQServer(mqSvr)
go serveReporter(&rpter)
forever := make(chan bool)
<-forever
}
func serveMQServer(server *execmq.Server) {
logger.Info("start serving mq server")
err := server.Serve()
if err != nil {
logger.Errorf("mq server stopped with error: %s", err.Error())
}
logger.Info("mq server stopped")
}
func serveReporter(rpt *reporter.Reporter) {
logger.Info("start serving reporter")
err := rpt.Serve()
if err != nil {
logger.Errorf("rpt stopped with error: %s", err.Error())
}
logger.Info("rpt stopped")
}

View File

@ -0,0 +1,39 @@
package advisor
import (
"gitlink.org.cn/cloudream/common/models"
"gitlink.org.cn/cloudream/common/pkgs/mq"
)
// 获取调度方案
var _ = Register(Service.StartGetScheduleScheme)
type StartGetScheduleScheme struct {
// UserID int64 `json:"userID"`
// PackageID int64 `json:"packageID"`
}
func NewStartGetScheduleScheme() StartGetScheduleScheme {
return StartGetScheduleScheme{
// UserID: userID,
// PackageID: packageID,
}
}
type StartGetScheduleSchemeResp struct {
TaskID string `json:"taskID"`
}
func NewStartGetScheduleSchemeResp(taskID string) StartGetScheduleSchemeResp {
return StartGetScheduleSchemeResp{
TaskID: taskID,
}
}
func (c *Client) StartGetScheduleScheme(msg StartGetScheduleScheme, opts ...mq.RequestOption) (*StartGetScheduleSchemeResp, error) {
return mq.Request[StartGetScheduleSchemeResp](c.rabbitCli, msg, opts...)
}
func init() {
mq.RegisterUnionType(models.ResourceDataTypeUnion)
}

View File

@ -0,0 +1,59 @@
package advisor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
type Client struct {
rabbitCli *mq.RabbitMQClient
}
func NewClient(cfg *mymq.Config) (*Client, error) {
rabbitCli, err := mq.NewRabbitMQClient(cfg.MakeConnectingURL(), ServerQueueName, "")
if err != nil {
return nil, err
}
return &Client{
rabbitCli: rabbitCli,
}, nil
}
func (c *Client) Close() {
c.rabbitCli.Close()
}
type PoolClient struct {
*Client
owner *Pool
}
func (c *PoolClient) Close() {
c.owner.Release(c)
}
type Pool struct {
mqcfg *mymq.Config
}
func NewPool(mqcfg *mymq.Config) *Pool {
return &Pool{
mqcfg: mqcfg,
}
}
func (p *Pool) Acquire() (*PoolClient, error) {
cli, err := NewClient(p.mqcfg)
if err != nil {
return nil, err
}
return &PoolClient{
Client: cli,
owner: p,
}, nil
}
func (p *Pool) Release(cli *PoolClient) {
cli.Client.Close()
}

View File

@ -0,0 +1,70 @@
package advisor
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
mymq "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq"
)
const (
ServerQueueName = "Scheduler-Collector"
)
type Service interface {
StartGetScheduleScheme(msg *StartGetScheduleScheme) (*StartGetScheduleSchemeResp, *mq.CodeMessage)
}
type Server struct {
service Service
rabbitSvr mq.RabbitMQServer
OnError func(err error)
}
func NewServer(svc Service, cfg *mymq.Config) (*Server, error) {
srv := &Server{
service: svc,
}
rabbitSvr, err := mq.NewRabbitMQServer(
cfg.MakeConnectingURL(),
ServerQueueName,
func(msg *mq.Message) (*mq.Message, error) {
return msgDispatcher.Handle(srv.service, msg)
},
)
if err != nil {
return nil, err
}
srv.rabbitSvr = *rabbitSvr
return srv, nil
}
func (s *Server) Stop() {
s.rabbitSvr.Close()
}
func (s *Server) Serve() error {
return s.rabbitSvr.Serve()
}
var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func Register[TSvc any, TReq any, TResp any](svcFn func(svc TSvc, msg *TReq) (*TResp, *mq.CodeMessage)) any {
mq.AddServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
mq.RegisterMessage[TResp]()
return nil
}
// RegisterNoReply 将Service中的一个*没有返回值的*接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型
// TODO 需要约束Service实现了TSvc接口
func RegisterNoReply[TSvc any, TReq any](svcFn func(svc TSvc, msg *TReq)) any {
mq.AddNoRespServiceFn(&msgDispatcher, svcFn)
mq.RegisterMessage[TReq]()
return nil
}

View File

@ -0,0 +1,73 @@
package task
type TaskStatus interface{}
type TaskStatusConst interface {
TaskStatus | ScheduleTaskStatus | UploadImageTaskStatus
}
type ScheduleTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
PCMJobID int64 `json:"pcmJobID"`
}
func NewScheduleTaskStatus(status string, err string, pcmJobID int64) ScheduleTaskStatus {
return ScheduleTaskStatus{
Status: status,
Error: err,
PCMJobID: pcmJobID,
}
}
type UploadImageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
ImageID int64 `json:"imageID"`
}
func NewUploadImageTaskStatus(status string, err string, imageID int64) UploadImageTaskStatus {
return UploadImageTaskStatus{
Status: status,
Error: err,
ImageID: imageID,
}
}
type CacheMovePackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
}
func NewCacheMovePackageTaskStatus(status string, err string) CacheMovePackageTaskStatus {
return CacheMovePackageTaskStatus{
Status: status,
Error: err,
}
}
type CreatePackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
PackageID int64 `json:"packageID"`
}
func NewCreatePackageTaskStatus(status string, err string, packageID int64) CreatePackageTaskStatus {
return CreatePackageTaskStatus{
Status: status,
Error: err,
PackageID: packageID,
}
}
type LoadPackageTaskStatus struct {
Status string `json:"status"`
Error string `json:"error"`
}
func NewLoadPackageTaskStatus(status string, err string) LoadPackageTaskStatus {
return LoadPackageTaskStatus{
Status: status,
Error: err,
}
}

View File

@ -2,32 +2,33 @@ package manager
import (
"gitlink.org.cn/cloudream/common/pkgs/mq"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
exectsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/executor/task"
)
// 接收executor上报的存活状态及任务执行情况
var _ = Register(Service.ReportTaskStatus)
var _ = Register(Service.ReportExecutorTaskStatus)
type ReportTaskStatus struct {
type ReportExecutorTaskStatus struct {
ExecutorID string `json:"executorID"`
TaskStatus []ExecutorTaskStatus `json:"taskStatus"`
}
type ReportTaskStatusResp struct {
type ReportExecutorTaskStatusResp struct {
}
type ExecutorTaskStatus struct {
TaskID string
Status exectsk.TaskStatus
}
func NewReportTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) ReportTaskStatus {
return ReportTaskStatus{
func NewReportExecutorTaskStatus(executorID string, taskStatus []ExecutorTaskStatus) ReportExecutorTaskStatus {
return ReportExecutorTaskStatus{
ExecutorID: executorID,
TaskStatus: taskStatus,
}
}
func NewReportTaskStatusResp() ReportTaskStatusResp {
return ReportTaskStatusResp{}
func NewReportExecutorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
}
func NewExecutorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) ExecutorTaskStatus {
return ExecutorTaskStatus{
@ -35,6 +36,40 @@ func NewExecutorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) E
Status: status,
}
}
func (c *Client) ReportTaskStatus(msg ReportTaskStatus, opts ...mq.RequestOption) (*ReportTaskStatusResp, error) {
return mq.Request[ReportTaskStatusResp](c.rabbitCli, msg, opts...)
func (c *Client) ReportExecutorTaskStatus(msg ReportExecutorTaskStatus, opts ...mq.RequestOption) (*ReportExecutorTaskStatusResp, error) {
return mq.Request[ReportExecutorTaskStatusResp](c.rabbitCli, msg, opts...)
}
// 接收advisor上报的存活状态及任务执行情况
var _ = Register(Service.ReportAdvisorTaskStatus)
type ReportAdvisorTaskStatus struct {
AdvisorID string `json:"advisorID"`
TaskStatus []AdvisorTaskStatus `json:"taskStatus"`
}
type ReportAdvisorTaskStatusResp struct {
}
type AdvisorTaskStatus struct {
TaskID string
Status advtsk.TaskStatus
}
func NewReportAdvisorTaskStatus(advisorID string, taskStatus []AdvisorTaskStatus) ReportAdvisorTaskStatus {
return ReportAdvisorTaskStatus{
AdvisorID: advisorID,
TaskStatus: taskStatus,
}
}
func NewReportAdvisorTaskStatusResp() ReportExecutorTaskStatusResp {
return ReportExecutorTaskStatusResp{}
}
func NewAdvisorTaskStatus[T exectsk.TaskStatusConst](taskID string, status T) AdvisorTaskStatus {
return AdvisorTaskStatus{
TaskID: taskID,
Status: status,
}
}
func (c *Client) ReportAdvisorTaskStatus(msg ReportAdvisorTaskStatus, opts ...mq.RequestOption) (*ReportAdvisorTaskStatusResp, error) {
return mq.Request[ReportAdvisorTaskStatusResp](c.rabbitCli, msg, opts...)
}

View File

@ -10,7 +10,8 @@ const (
)
type Service interface {
ReportTaskStatus(msg *ReportTaskStatus) (*ReportTaskStatusResp, *mq.CodeMessage)
ReportExecutorTaskStatus(msg *ReportExecutorTaskStatus) (*ReportExecutorTaskStatusResp, *mq.CodeMessage)
ReportAdvisorTaskStatus(msg *ReportAdvisorTaskStatus) (*ReportAdvisorTaskStatusResp, *mq.CodeMessage)
}
type Server struct {

View File

@ -1,2 +1,2 @@
# scheduler-dcontroller
# scheduler-executor

View File

@ -70,7 +70,7 @@ func (r *Reporter) Serve() error {
r.taskStatus = make(map[string]exectsk.TaskStatus)
r.taskStatusLock.Unlock()
_, err := magCli.ReportTaskStatus(mgrmq.NewReportTaskStatus(r.executorID, taskStatus))
_, err := magCli.ReportExecutorTaskStatus(mgrmq.NewReportExecutorTaskStatus(r.executorID, taskStatus))
if err != nil {
logger.Warnf("reporting to manager: %s", err.Error())