pcm-kubernetes/internal/cron/taskcron.go

188 lines
5.6 KiB
Go

package cron
import (
"context"
"github.com/zeromicro/go-zero/core/logx"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/pcmcore"
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/internal/logic"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/internal/svc"
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
"gitlink.org.cn/jcce-pcm/utils/tool"
v1 "k8s.io/api/batch/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
"time"
)
func SyncTask(svc *svc.ServiceContext) {
var oldCloudList []*pcmcore.CloudInfo
participantId, err := tool.GetParticipantId("etc/kubernetes.yaml")
if err != nil {
return
}
// 查询core端分发下来的任务列表
infoReq := pcmCore.InfoListReq{
ParticipantId: participantId,
}
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
if infoList != nil && len(infoList.CloudInfoList) != 0 {
tool.Convert(&infoList.CloudInfoList, &oldCloudList)
if err != nil {
logx.Error(err)
return
}
// 遍历执行任务操作
for index, _ := range infoList.CloudInfoList {
// 删除任务
if infoList.CloudInfoList[index].Status == "WaitDelete" {
delete(infoList.CloudInfoList[index], svc)
}
// 执行任务
if infoList.CloudInfoList[index].Status == "Saved" {
apply(infoList.CloudInfoList[index], svc)
}
}
// 遍历查询任务信息
for index, _ := range infoList.CloudInfoList {
if infoList.CloudInfoList[index].Kind == "Deployment" {
DeploymentHandler(infoList.CloudInfoList[index], svc)
}
if infoList.CloudInfoList[index].Kind == "StatefulSet" {
StatefulSetHandler(infoList.CloudInfoList[index], svc)
}
if infoList.CloudInfoList[index].Kind == "Job" {
JobHandler(infoList.CloudInfoList[index], svc)
}
}
// 同步信息到core端
SyncInfoReq := pcmCore.SyncInfoReq{
ParticipantId: participantId,
CloudInfoList: make([]*pcmCore.CloudInfo, 0),
}
for _, newCloud := range infoList.CloudInfoList {
for _, oldCloud := range oldCloudList {
if oldCloud.Id == newCloud.Id && oldCloud.Status != newCloud.Status {
SyncInfoReq.CloudInfoList = append(SyncInfoReq.CloudInfoList, newCloud)
}
}
}
if len(SyncInfoReq.CloudInfoList) != 0 {
resp, err := svc.PcmCoreRpc.SyncInfo(context.Background(), &SyncInfoReq)
if err != nil {
logx.Error(resp.Msg)
return
}
}
}
}
// 删除资源
func delete(cloud *pcmCore.CloudInfo, svc *svc.ServiceContext) {
deleteYamlLogic := logic.NewDeleteYamlLogic(context.Background(), svc)
deleteReq := kubernetesclient.ApplyReq{
YamlString: cloud.YamlString,
}
_, err := deleteYamlLogic.DeleteYaml(&deleteReq)
if err != nil {
cloud.Status = "DeleteError"
cloud.Result = err.Error()
return
}
cloud.Status = "Deleted"
}
// 执行资源
func apply(cloud *pcmCore.CloudInfo, svc *svc.ServiceContext) {
applyYamlLogic := logic.NewApplyYamlLogic(context.Background(), svc)
// 提交任务
applyReq := kubernetesclient.ApplyReq{
YamlString: cloud.YamlString,
}
cloud.Status = "Running"
_, err := applyYamlLogic.ApplyYaml(&applyReq)
if err != nil {
cloud.Status = "Failed"
cloud.Result = err.Error()
}
}
func DeploymentHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) {
// 遍历core端任务列表信息
deploymentList, err := svc.ClientSet.AppsV1().Deployments(cloudInfo.Namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
return
}
for _, deployment := range deploymentList.Items {
if deployment.Name == cloudInfo.Name {
cloudInfo.StartTime = tool.TimeRemoveZone(deployment.Status.Conditions[0].LastTransitionTime.Time).String()
// 判断状态
if deployment.Status.Replicas == 0 && deployment.Status.ReadyReplicas != 0 {
cloudInfo.Status = "Updating"
} else if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
cloudInfo.Status = "Running"
} else {
cloudInfo.Status = "Stopped"
}
}
}
}
func StatefulSetHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) {
// 遍历core端任务列表信息
stsList, err := svc.ClientSet.AppsV1().StatefulSets(cloudInfo.Namespace).List(context.Background(), metav1.ListOptions{})
if err != nil {
return
}
for _, sts := range stsList.Items {
if sts.Name == cloudInfo.Name {
cloudInfo.StartTime = tool.TimeRemoveZone(sts.CreationTimestamp.Time).String()
// 判断状态
if sts.Status.Replicas == 0 && sts.Status.ReadyReplicas != 0 {
cloudInfo.Status = "Updating"
} else if sts.Status.ReadyReplicas == sts.Status.Replicas {
cloudInfo.Status = "Running"
} else {
cloudInfo.Status = "Stopped"
}
}
}
}
func JobHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) {
jobList, err := svc.ClientSet.BatchV1().Jobs("").List(context.Background(), metav1.ListOptions{})
if err != nil {
return
}
// 遍历core端任务列表信息
for _, job := range jobList.Items {
if job.Namespace == cloudInfo.Namespace && job.Name == cloudInfo.Name {
cloudInfo.StartTime = tool.TimeRemoveZone(job.Status.StartTime.Time).String()
cloudInfo.RunningTime = time.Now().Sub(job.Status.StartTime.Time).Milliseconds() / 1000
if reflect.DeepEqual(job.Status, v1.JobStatus{}) {
cloudInfo.Status = "Failed"
}
cloudInfo.Status = "Running"
if job.Status.Conditions != nil {
for _, item := range job.Status.Conditions {
if item.Type == "Failed" && item.Status == "True" {
cloudInfo.Status = "Failed"
return
}
if item.Type == "Complete" && item.Status == "True" {
cloudInfo.Status = "Completed"
return
}
}
}
if *job.Spec.Completions <= job.Status.Succeeded {
cloudInfo.Status = "Failed"
}
}
}
}