This commit is contained in:
zhangwei 2023-11-29 19:09:07 +08:00
parent 8710013a83
commit 5e7d1dc22a
11 changed files with 33 additions and 53 deletions

View File

@ -180,10 +180,10 @@ type (
Description string `yaml:"description"`
tenantId int64 `yaml:"tenantId"`
nsID string `yaml:"nsID"`
replicas int64 `yaml:"replicas"`
tasks []TaskYaml `yaml:"tasks"`
}
TaskYaml {
replicas int64 `yaml:"replicas"`
TaskId int64 `yaml:"taskId"`
nsID string `yaml:"nsID"`
taskType string `yaml:"taskType"`

View File

@ -7,7 +7,7 @@ NacosConfig:
- IpAddr: 119.45.100.73
Port: 8848
ClientConfig:
NamespaceId: zhouqj
NamespaceId: zw
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir:

View File

@ -66,6 +66,7 @@ func (l *ScheduleTaskByYamlLogic) ScheduleTaskByYaml(req *types.ScheduleTaskByYa
for _, task := range req.Tasks {
task.NsID = req.NsID
task.TaskId = taskModel.Id
// 将任务数据转换成消息体
reqMessage, err := json.Marshal(task)
if err != nil {

View File

@ -161,11 +161,11 @@ type ScheduleTaskByYamlReq struct {
Description string `yaml:"description"`
TenantId int64 `yaml:"tenantId"`
NsID string `yaml:"nsID"`
Replicas int64 `yaml:"replicas"`
Tasks []TaskYaml `yaml:"tasks"`
}
type TaskYaml struct {
Replicas int64 `yaml:"replicas"`
TaskId int64 `yaml:"taskId"`
NsID string `yaml:"nsID"`
TaskType string `yaml:"taskType"`

View File

@ -30,7 +30,7 @@ func NewAiScheduler(val string) *aiScheduler {
return &aiScheduler{yamlString: val}
}
func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) {
ai := models.Ai{
ParticipantId: participantId,
TaskId: task.TaskId,

View File

@ -23,7 +23,6 @@ import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gorm.io/gorm"
"io"
v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
@ -48,34 +47,15 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg
return taskResult.MaxscoreStrategy, nil
}
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) {
bytes, err := json.Marshal(task.Metadata)
var bytesNew []byte
//replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
bytes, err := json.Marshal(resource)
if err != nil {
return nil, err
}
cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId, task.NsID)
switch cloud.Kind {
case "Deployment":
deployment := v1.Deployment{}
json.Unmarshal(bytes, &deployment)
deployment.Spec.Replicas = &replica
deployment.Namespace = cloud.NsID
bytesNew, _ = json.Marshal(deployment)
case "StatefulSet":
statefulSet := v1.StatefulSet{}
json.Unmarshal(bytes, &statefulSet)
statefulSet.Spec.Replicas = &replica
statefulSet.Namespace = cloud.NsID
bytesNew, _ = json.Marshal(statefulSet)
}
cloud.Replica = replica
cloud.Id = utils.GenSnowflakeID()
cloud.YamlString = string(bytesNew)
cloud.YamlString = string(bytes)
cloud.NsID = task.NsID
cloud.ParticipantId = participantId

View File

@ -23,7 +23,7 @@ import (
)
type scheduleService interface {
getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error)
getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error)
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error)
genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider)
}

View File

@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *hpcScheduler {
return &hpcScheduler{yamlString: val}
}
func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, participantId int64, replica int32) (interface{}, error) {
func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) {
hpc := models.Hpc{}
utils.Convert(task.Metadata, &hpc)
hpc.Id = utils.GenSnowflakeID()

View File

@ -16,7 +16,6 @@ package scheduler
import (
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
@ -33,7 +32,7 @@ type scheduler struct {
participantIds []int64
scheduleService scheduleService
dbEngin *gorm.DB
result map[int64]string //pID:子任务yamlstring 键值对
result []string //pID:子任务yamlstring 键值对
participantRpc participantservice.ParticipantService
}
@ -43,7 +42,7 @@ func NewScheduler(scheduleService scheduleService, val string, dbEngin *gorm.DB,
if err != nil {
return nil, errors.New("create scheduler failed : " + err.Error())
}
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc, result: make(map[int64]string, 0)}, nil
return &scheduler{task: task, scheduleService: scheduleService, dbEngin: dbEngin, participantRpc: participantRpc}, nil
}
func (s *scheduler) SpecifyClusters() {
@ -100,10 +99,11 @@ func (s *scheduler) TempAssign() error {
var resources []interface{}
tool.Convert(s.task.Metadata, &resources)
for _, resource := range resources {
for index := range resources {
//如果是Deployment需要对副本数做分发
if resource.(map[string]interface{})["kind"].(string) == "Deployment" || resource.(map[string]interface{})["kind"].(string) == "Replicaset" {
resource.(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas
if resources[index].(map[string]interface{})["kind"].(string) == "Deployment" || resources[index].(map[string]interface{})["kind"].(string) == "Replicaset" ||
resources[index].(map[string]interface{})["kind"].(string) == "StatefulSet" {
resources[index].(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas
}
}
s.task.Metadata = resources
@ -126,7 +126,7 @@ func (s *scheduler) AssignAndSchedule() error {
replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
result := make(map[int64]string)
result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
s.result = result
//s.result = result
return nil
}
@ -160,22 +160,22 @@ func (s *scheduler) AssignAndSchedule() error {
func (s *scheduler) SaveToDb() error {
for key, value := range s.result {
num, err := strconv.Atoi(value)
if err != nil {
fmt.Println("转换失败:", err)
for _, participantId := range s.participantIds {
var resources []interface{}
tool.Convert(s.task.Metadata, &resources)
for _, resource := range resources {
structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, int64(participantId))
if err != nil {
return err
}
}
structForDb, err := s.scheduleService.getNewStructForDb(s.task, int64(key), int32(num))
if err != nil {
return err
tx := s.dbEngin.Create(structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
}
tx := s.dbEngin.Create(structForDb)
if tx.Error != nil {
logx.Error(tx.Error)
return tx.Error
}
}
return nil
}

View File

@ -30,8 +30,7 @@ func TestGetNamedMetrics(t *testing.T) {
client, _ := NewPrometheus("http://10.105.20.4:30766")
result := client.GetNamedMetricsByTime([]string{"pod_cpu_usage", "pod_memory_usage_wo_cache"}, "1700521446", "1700551446", 10*time.Minute, ControllerOption{
PodsName: "notification-manager-deployment-78664576cb-vkptn|notification-manager-deployment-78664576cb-5m6mt",
Namespace: "kubesphere-monitoring-system",
PodsName: "sealos-task-112703-65c776b4b5-q4jgf",
})
println("zzz", result)
}

View File

@ -4,10 +4,10 @@ NacosConfig:
ServerConfigs:
# - IpAddr: 127.0.0.1
# Port: 8848
- IpAddr: nacos.jcce.dev
- IpAddr: 119.45.100.73
Port: 8848
ClientConfig:
NamespaceId: test
NamespaceId: zw
TimeoutMs: 5000
NotLoadCacheAtStart: true
LogDir: