pcm-coordinator/pkg/scheduler/scheduler.go

212 lines
5.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
Copyright (c) [2023] [pcm]
[pcm-coordinator] is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPaRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
package scheduler
import (
"encoding/json"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algorithm/providerPricing"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"sigs.k8s.io/yaml"
"strings"
)
type scheduler struct {
task *response.TaskInfo
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
}
func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB, participantRpc participantservice.ParticipantService) (*scheduler, error) {
var task *response.TaskInfo
err := json.Unmarshal([]byte(val), &task)
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func (s *scheduler) SpecifyClusters() {
// 如果已指定集群名通过数据库查询后返回p端ip列表
if len(s.task.Clusters) != 0 {
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", s.task.Clusters).Scan(&s.participantIds)
return
}
}
func (s *scheduler) SpecifyNsID() {
// 未指定集群名只指定nsID
if len(s.task.Clusters) == 0 {
if len(s.task.NsID) != 0 {
var clusters string
s.dbEngin.Raw("select clusters from sc_tenant_info where `tenant_name` = ?", s.task.NsID).Scan(&clusters)
clusterArr := strings.Split(clusters, ",")
s.dbEngin.Raw("select id from sc_participant_phy_info where `name` in (?)", clusterArr).Scan(&s.participantIds)
}
} else {
return
}
}
func (s *scheduler) MatchLabels() {
var ids []int64
count := 0
// 集群和nsID都未指定则通过标签匹配
if len(s.task.Clusters) == 0 && len(s.task.NsID) == 0 {
//如果集群列表或nsID均未指定
for key := range s.task.MatchLabels {
var participantIds []int64
s.dbEngin.Raw("select participant_id from sc_participant_label_info where `key` = ? and value = ?", key, s.task.MatchLabels[key]).Scan(&participantIds)
if count == 0 {
ids = participantIds
}
ids = intersect(ids, participantIds)
count++
}
s.participantIds = ids
} else {
return
}
}
// TempAssign todo 屏蔽原调度算法
func (s *scheduler) TempAssign() error {
//需要判断task中的资源类型针对metadata中的多个kind做不同处理
//输入副本数和集群列表最终结果输出为pID对应副本数量列表针对多个kind需要做拆分和重新拼接组合
var meData []string
for _, yamlString := range s.task.Metadata {
var data map[string]interface{}
err := yaml.Unmarshal([]byte(yamlString), &data)
if err != nil {
}
jsonData, err := json.Marshal(data)
if err != nil {
}
meData = append(meData, string(jsonData))
}
s.task.Metadata = meData
return nil
}
func (s *scheduler) AssignAndSchedule() error {
// 已指定 ParticipantId
if s.task.ParticipantId != 0 {
return nil
}
// 标签匹配以及后未找到ParticipantIds
if len(s.participantIds) == 0 {
return errors.New("未找到匹配的ParticipantIds")
}
// 指定或者标签匹配的结果只有一个集群,给任务信息指定
if len(s.participantIds) == 1 {
s.task.ParticipantId = s.participantIds[0]
//replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
//result := make(map[int64]string)
//result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
//s.result = result
return nil
}
//生成算法所需参数
task, providerList, err := s.obtainParamsForStrategy()
if err != nil {
return err
}
//集群数量不满足,指定到标签匹配后第一个集群
if len(providerList) < 2 {
s.task.ParticipantId = s.participantIds[0]
return nil
}
//调度算法
strategy, err := s.scheduleService.pickOptimalStrategy(task, providerList...)
if err != nil {
return err
}
//调度结果
err = s.assignReplicasToResult(strategy, providerList)
if err != nil {
return err
}
return nil
}
func (s *scheduler) SaveToDb() error {
for _, participantId := range s.participantIds {
for _, resource := range s.task.Metadata {
structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId)
if err != nil {
return err
}
tx := s.dbEngin.Create(structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
}
}
return nil
}
func (s *scheduler) obtainParamsForStrategy() (*providerPricing.Task, []*providerPricing.Provider, error) {
task, providerList := s.scheduleService.genTaskAndProviders(s.task, s.dbEngin)
if len(providerList) == 0 {
return nil, nil, errors.New("获取集群失败")
}
return task, providerList, nil
}
func (s *scheduler) assignReplicasToResult(strategy *providerPricing.Strategy, providerList []*providerPricing.Provider) error {
if len(strategy.Tasksolution) == 0 {
return errors.New("调度失败, 未能获取调度结果")
}
for i, e := range strategy.Tasksolution {
if e == 0 {
continue
}
s.result[providerList[i].Pid] = string(e)
}
if len(s.result) == 0 {
return errors.New("可用集群为空")
}
return nil
}