新增提交任务接口

This commit is contained in:
zhangwei 2023-12-06 21:03:01 +08:00
parent 6519222177
commit 2e52eb0850
14 changed files with 174 additions and 37 deletions

View File

@ -174,12 +174,20 @@ type deleteTaskReq {
Id int64 `path:"id"`
}
type commitTaskReq{
Name string `json:"name"`
NsID string `json:"nsID"`
Replicas int64 `json:"replicas"`
MatchLabels map[string]string `json:"matchLabels"`
YamlList []string `json:"yamlList"`
}
type (
scheduleTaskByYamlReq {
Name string `yaml:"name"`
Description string `yaml:"description"`
tenantId int64 `yaml:"tenantId"`
nsID string `yaml:"nsID"`
nsID string `form:"nsID"`
tasks []TaskYaml `yaml:"tasks"`
}
TaskYaml {

View File

@ -30,7 +30,11 @@ service pcm {
@doc "yaml提交任务"
@handler scheduleTaskByYamlHandler
post /core/scheduleTaskByYaml (scheduleTaskByYamlReq) returns (scheduleTaskByYamlResp)
post /core/scheduleTaskByYaml (scheduleTaskByYamlReq)
@doc "提交任务"
@handler commitTaskHandler
post /core/commitTask (commitTaskReq)
@doc "提交超算任务"
@handler commitHpcTaskHandler

View File

@ -0,0 +1,25 @@
package core
import (
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/repository/result"
"net/http"
"github.com/zeromicro/go-zero/rest/httpx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/logic/core"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
)
func CommitTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
var req types.CommitTaskReq
if err := httpx.Parse(r, &req); err != nil {
httpx.ErrorCtx(r.Context(), w, err)
return
}
l := core.NewCommitTaskLogic(r.Context(), svcCtx)
err := l.CommitTask(&req)
result.HttpResult(r, w, nil, err)
}
}

View File

@ -31,6 +31,11 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) {
Path: "/core/scheduleTaskByYaml",
Handler: core.ScheduleTaskByYamlHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/core/commitTask",
Handler: core.CommitTaskHandler(serverCtx),
},
{
Method: http.MethodPost,
Path: "/core/commitHpcTask",

View File

@ -48,7 +48,7 @@ func (l *CommitHpcTaskLogic) CommitHpcTask(req *types.CommitHpcTaskReq) (resp *t
TaskId: taskModel.Id,
TaskType: "hpc",
MatchLabels: req.MatchLabels,
Metadata: hpc,
//Metadata: hpc,
}
req.TaskId = taskModel.Id
// 将任务数据转换成消息体

View File

@ -0,0 +1,63 @@
package core
import (
"context"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/constants"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"k8s.io/apimachinery/pkg/util/json"
"time"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/internal/types"
"github.com/zeromicro/go-zero/core/logx"
)
type CommitTaskLogic struct {
logx.Logger
ctx context.Context
svcCtx *svc.ServiceContext
}
func NewCommitTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CommitTaskLogic {
return &CommitTaskLogic{
Logger: logx.WithContext(ctx),
ctx: ctx,
svcCtx: svcCtx,
}
}
func (l *CommitTaskLogic) CommitTask(req *types.CommitTaskReq) error {
taskModel := models.Task{
Status: constants.Saved,
Name: req.Name,
CommitTime: time.Now(),
NsID: req.NsID,
}
// 保存任务数据到数据库
tx := l.svcCtx.DbEngin.Create(&taskModel)
if tx.Error != nil {
return tx.Error
}
task := response.TaskInfo{
TaskId: taskModel.Id,
MatchLabels: req.MatchLabels,
NsID: req.NsID,
Metadata: req.YamlList,
Replicas: req.Replicas,
}
// 将任务数据转换成消息体
reqMessage, err := json.Marshal(task)
if err != nil {
logx.Error(err)
return err
}
publish := l.svcCtx.RedisClient.Publish(context.Background(), "cloud", reqMessage)
if publish.Err() != nil {
return publish.Err()
}
return nil
}

View File

@ -156,11 +156,19 @@ type DeleteTaskReq struct {
Id int64 `path:"id"`
}
type CommitTaskReq struct {
Name string `json:"name"`
NsID string `json:"nsID"`
Replicas int64 `json:"replicas"`
MatchLabels map[string]string `json:"matchLabels"`
YamlList []string `json:"yamlList"`
}
type ScheduleTaskByYamlReq struct {
Name string `yaml:"name"`
Description string `yaml:"description"`
TenantId int64 `yaml:"tenantId"`
NsID string `yaml:"nsID"`
NsID string `form:"nsID"`
Tasks []TaskYaml `yaml:"tasks"`
}

View File

@ -25,7 +25,7 @@ type TaskInfo struct {
Clusters []string `json:"clusters,optional"` //云际平台传入集群名称列表
TenantId int64 `json:"tenantId"`
Replicas int64 `json:"replicas"`
Metadata interface{} `json:"metadata"`
Metadata []string `json:"metadata"`
}
func (t *TaskInfo) Validate() error {

View File

@ -30,7 +30,7 @@ func NewAiScheduler(val string) *aiScheduler {
return &aiScheduler{yamlString: val}
}
func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) {
func (as *aiScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
ai := models.Ai{
ParticipantId: participantId,
TaskId: task.TaskId,

View File

@ -16,7 +16,6 @@ package scheduler
import (
"bytes"
"encoding/json"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/models"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
@ -47,15 +46,10 @@ func (cs *cloudScheduler) pickOptimalStrategy(task *algo.Task, providers ...*alg
return taskResult.MaxscoreStrategy, nil
}
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) {
bytes, err := json.Marshal(resource)
if err != nil {
return nil, err
}
cloud := cs.UnMarshalK8sStruct(string(bytes), task.TaskId, task.NsID)
func (cs *cloudScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
cloud := cs.UnMarshalK8sStruct(resource, task.TaskId, task.NsID)
cloud.Id = utils.GenSnowflakeID()
cloud.YamlString = string(bytes)
cloud.YamlString = resource
cloud.NsID = task.NsID
cloud.ParticipantId = participantId
@ -114,8 +108,8 @@ func (cs *cloudScheduler) genTaskAndProviders(task *response.TaskInfo, dbEngin *
providerList = append(providerList, provider)
}
replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
t := algo.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000)
//replicas := task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
//t := algo.NewTask(0, int(replicas), 2, 75120000, 301214500, 1200, 2, 6, 2000)
return t, providerList
return nil, providerList
}

View File

@ -23,7 +23,7 @@ import (
)
type scheduleService interface {
getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error)
getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error)
pickOptimalStrategy(task *algo.Task, providers ...*algo.Provider) (*algo.Strategy, error)
genTaskAndProviders(task *response.TaskInfo, dbEngin *gorm.DB) (*algo.Task, []*algo.Provider)
}

View File

@ -31,7 +31,7 @@ func NewHpcScheduler(val string) *hpcScheduler {
return &hpcScheduler{yamlString: val}
}
func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource interface{}, participantId int64) (interface{}, error) {
func (h *hpcScheduler) getNewStructForDb(task *response.TaskInfo, resource string, participantId int64) (interface{}, error) {
hpc := models.Hpc{}
utils.Convert(task.Metadata, &hpc)
hpc.Id = utils.GenSnowflakeID()

View File

@ -20,10 +20,9 @@ import (
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/api/pkg/response"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/scheduler/algo"
tool "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/participantservice"
"gorm.io/gorm"
"strconv"
"sigs.k8s.io/yaml"
"strings"
)
@ -96,17 +95,19 @@ func (s *scheduler) TempAssign() error {
//需要判断task中的资源类型针对metadata中的多个kind做不同处理
//输入副本数和集群列表最终结果输出为pID对应副本数量列表针对多个kind需要做拆分和重新拼接组合
var resources []interface{}
tool.Convert(s.task.Metadata, &resources)
for index := range resources {
//如果是Deployment需要对副本数做分发
if resources[index].(map[string]interface{})["kind"].(string) == "Deployment" || resources[index].(map[string]interface{})["kind"].(string) == "Replicaset" ||
resources[index].(map[string]interface{})["kind"].(string) == "StatefulSet" {
resources[index].(map[string]interface{})["spec"].(map[string]interface{})["replicas"] = s.task.Replicas
var meData []string
for _, yamlString := range s.task.Metadata {
var data map[string]interface{}
err := yaml.Unmarshal([]byte(yamlString), &data)
if err != nil {
}
jsonData, err := json.Marshal(data)
if err != nil {
}
meData = append(meData, string(jsonData))
}
s.task.Metadata = resources
s.task.Metadata = meData
return nil
}
@ -123,9 +124,9 @@ func (s *scheduler) AssignAndSchedule() error {
// 指定或者标签匹配的结果只有一个集群,给任务信息指定
if len(s.participantIds) == 1 {
s.task.ParticipantId = s.participantIds[0]
replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
result := make(map[int64]string)
result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
//replicas := s.task.Metadata.(map[string]interface{})["spec"].(map[string]interface{})["replicas"].(float64)
//result := make(map[int64]string)
//result[s.participantIds[0]] = strconv.FormatFloat(replicas, 'f', 2, 64)
//s.result = result
return nil
@ -161,9 +162,8 @@ func (s *scheduler) AssignAndSchedule() error {
func (s *scheduler) SaveToDb() error {
for _, participantId := range s.participantIds {
var resources []interface{}
tool.Convert(s.task.Metadata, &resources)
for _, resource := range resources {
for _, resource := range s.task.Metadata {
structForDb, err := s.scheduleService.getNewStructForDb(s.task, resource, participantId)
if err != nil {
return err

30
pkg/utils/kubernetes.go Normal file
View File

@ -0,0 +1,30 @@
package utils
import (
"bytes"
"io"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
syaml "k8s.io/apimachinery/pkg/runtime/serializer/yaml"
"k8s.io/apimachinery/pkg/util/yaml"
)
func StrToInfo(val string) *unstructured.Unstructured {
d := yaml.NewYAMLOrJSONDecoder(bytes.NewBufferString(val), 4096)
var err error
var rawObj runtime.RawExtension
err = d.Decode(&rawObj)
if err == io.EOF {
}
if err != nil {
}
obj := &unstructured.Unstructured{}
syaml.NewDecodingSerializer(unstructured.UnstructuredJSONScheme).Decode(rawObj.Raw, nil, obj)
unstructuredMap, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
unStructureObj := &unstructured.Unstructured{Object: unstructuredMap}
return unStructureObj
}