forked from nudtpc/pcm-kubernetes
188 lines
5.6 KiB
Go
188 lines
5.6 KiB
Go
package cron
|
|
|
|
import (
|
|
"context"
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/client/pcmcore"
|
|
"gitlink.org.cn/jcce-pcm/pcm-coordinator/rpc/pcmCore"
|
|
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/internal/logic"
|
|
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/internal/svc"
|
|
"gitlink.org.cn/jcce-pcm/pcm-participant-kubernetes/kubernetesclient"
|
|
"gitlink.org.cn/jcce-pcm/utils/tool"
|
|
v1 "k8s.io/api/batch/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"reflect"
|
|
"time"
|
|
)
|
|
|
|
func SyncTask(svc *svc.ServiceContext) {
|
|
var oldCloudList []*pcmcore.CloudInfo
|
|
participantId, err := tool.GetParticipantId("etc/kubernetes.yaml")
|
|
if err != nil {
|
|
return
|
|
}
|
|
// 查询core端分发下来的任务列表
|
|
infoReq := pcmCore.InfoListReq{
|
|
ParticipantId: participantId,
|
|
}
|
|
infoList, err := svc.PcmCoreRpc.InfoList(context.Background(), &infoReq)
|
|
|
|
if infoList != nil && len(infoList.CloudInfoList) != 0 {
|
|
tool.Convert(&infoList.CloudInfoList, &oldCloudList)
|
|
if err != nil {
|
|
logx.Error(err)
|
|
return
|
|
}
|
|
// 遍历执行任务操作
|
|
for index, _ := range infoList.CloudInfoList {
|
|
// 删除任务
|
|
if infoList.CloudInfoList[index].Status == "WaitDelete" {
|
|
delete(infoList.CloudInfoList[index], svc)
|
|
}
|
|
// 执行任务
|
|
if infoList.CloudInfoList[index].Status == "Saved" {
|
|
apply(infoList.CloudInfoList[index], svc)
|
|
}
|
|
|
|
}
|
|
// 遍历查询任务信息
|
|
for index, _ := range infoList.CloudInfoList {
|
|
if infoList.CloudInfoList[index].Kind == "Deployment" {
|
|
DeploymentHandler(infoList.CloudInfoList[index], svc)
|
|
}
|
|
if infoList.CloudInfoList[index].Kind == "StatefulSet" {
|
|
StatefulSetHandler(infoList.CloudInfoList[index], svc)
|
|
}
|
|
if infoList.CloudInfoList[index].Kind == "Job" {
|
|
JobHandler(infoList.CloudInfoList[index], svc)
|
|
}
|
|
}
|
|
// 同步信息到core端
|
|
SyncInfoReq := pcmCore.SyncInfoReq{
|
|
ParticipantId: participantId,
|
|
CloudInfoList: make([]*pcmCore.CloudInfo, 0),
|
|
}
|
|
for _, newCloud := range infoList.CloudInfoList {
|
|
for _, oldCloud := range oldCloudList {
|
|
if oldCloud.Id == newCloud.Id && oldCloud.Status != newCloud.Status {
|
|
SyncInfoReq.CloudInfoList = append(SyncInfoReq.CloudInfoList, newCloud)
|
|
}
|
|
}
|
|
}
|
|
if len(SyncInfoReq.CloudInfoList) != 0 {
|
|
resp, err := svc.PcmCoreRpc.SyncInfo(context.Background(), &SyncInfoReq)
|
|
if err != nil {
|
|
logx.Error(resp.Msg)
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
// 删除资源
|
|
func delete(cloud *pcmCore.CloudInfo, svc *svc.ServiceContext) {
|
|
deleteYamlLogic := logic.NewDeleteYamlLogic(context.Background(), svc)
|
|
deleteReq := kubernetesclient.ApplyReq{
|
|
YamlString: cloud.YamlString,
|
|
}
|
|
_, err := deleteYamlLogic.DeleteYaml(&deleteReq)
|
|
if err != nil {
|
|
cloud.Status = "DeleteError"
|
|
cloud.Result = err.Error()
|
|
return
|
|
}
|
|
cloud.Status = "Deleted"
|
|
}
|
|
|
|
// 执行资源
|
|
func apply(cloud *pcmCore.CloudInfo, svc *svc.ServiceContext) {
|
|
applyYamlLogic := logic.NewApplyYamlLogic(context.Background(), svc)
|
|
// 提交任务
|
|
applyReq := kubernetesclient.ApplyReq{
|
|
YamlString: cloud.YamlString,
|
|
}
|
|
cloud.Status = "Running"
|
|
_, err := applyYamlLogic.ApplyYaml(&applyReq)
|
|
if err != nil {
|
|
cloud.Status = "Failed"
|
|
cloud.Result = err.Error()
|
|
}
|
|
}
|
|
|
|
func DeploymentHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) {
|
|
// 遍历core端任务列表信息
|
|
deploymentList, err := svc.ClientSet.AppsV1().Deployments(cloudInfo.Namespace).List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, deployment := range deploymentList.Items {
|
|
if deployment.Name == cloudInfo.Name {
|
|
cloudInfo.StartTime = tool.TimeRemoveZone(deployment.Status.Conditions[0].LastTransitionTime.Time).String()
|
|
// 判断状态
|
|
if deployment.Status.Replicas == 0 && deployment.Status.ReadyReplicas != 0 {
|
|
cloudInfo.Status = "Updating"
|
|
} else if deployment.Status.ReadyReplicas == deployment.Status.Replicas {
|
|
cloudInfo.Status = "Running"
|
|
} else {
|
|
cloudInfo.Status = "Stopped"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func StatefulSetHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) {
|
|
// 遍历core端任务列表信息
|
|
stsList, err := svc.ClientSet.AppsV1().StatefulSets(cloudInfo.Namespace).List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
for _, sts := range stsList.Items {
|
|
if sts.Name == cloudInfo.Name {
|
|
cloudInfo.StartTime = tool.TimeRemoveZone(sts.CreationTimestamp.Time).String()
|
|
// 判断状态
|
|
if sts.Status.Replicas == 0 && sts.Status.ReadyReplicas != 0 {
|
|
cloudInfo.Status = "Updating"
|
|
} else if sts.Status.ReadyReplicas == sts.Status.Replicas {
|
|
cloudInfo.Status = "Running"
|
|
} else {
|
|
cloudInfo.Status = "Stopped"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func JobHandler(cloudInfo *pcmCore.CloudInfo, svc *svc.ServiceContext) {
|
|
jobList, err := svc.ClientSet.BatchV1().Jobs("").List(context.Background(), metav1.ListOptions{})
|
|
if err != nil {
|
|
return
|
|
}
|
|
// 遍历core端任务列表信息
|
|
for _, job := range jobList.Items {
|
|
if job.Namespace == cloudInfo.Namespace && job.Name == cloudInfo.Name {
|
|
cloudInfo.StartTime = tool.TimeRemoveZone(job.Status.StartTime.Time).String()
|
|
cloudInfo.RunningTime = time.Now().Sub(job.Status.StartTime.Time).Milliseconds() / 1000
|
|
if reflect.DeepEqual(job.Status, v1.JobStatus{}) {
|
|
cloudInfo.Status = "Failed"
|
|
}
|
|
cloudInfo.Status = "Running"
|
|
if job.Status.Conditions != nil {
|
|
for _, item := range job.Status.Conditions {
|
|
if item.Type == "Failed" && item.Status == "True" {
|
|
cloudInfo.Status = "Failed"
|
|
return
|
|
}
|
|
if item.Type == "Complete" && item.Status == "True" {
|
|
cloudInfo.Status = "Completed"
|
|
return
|
|
}
|
|
}
|
|
}
|
|
if *job.Spec.Completions <= job.Status.Succeeded {
|
|
cloudInfo.Status = "Failed"
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|