pcm-hpc/cron/taskcron.go

196 lines
5.7 KiB
Go

package cron
import (
"encoding/json"
"github.com/go-resty/resty/v2"
"github.com/rs/zerolog/log"
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/client"
"gitlink.org.cn/JointCloud/pcm-hpc/ac"
"gitlink.org.cn/JointCloud/pcm-hpc/paratera"
"gitlink.org.cn/JointCloud/pcm-hpc/slurm"
"net/http"
"strconv"
"strings"
)
var SlurmClients []slurm.Client
var ParateraClients []paratera.Client
var AcClients []ac.Client
var AdapterId int64
var CoreUrl string
func PullTaskInfo() {
//pull task from coordinator
httpClient := resty.New().R()
result, err := httpClient.SetHeader("Content-Type", "application/json").
SetQueryParam("adapterId", strconv.FormatInt(AdapterId, 10)).
Get(CoreUrl + "/pcm/v1/core/pullTaskInfo")
if err != nil {
return
}
var resp coreClient.PullTaskInfoResp
err = json.Unmarshal(result.Body(), &resp)
if err != nil {
return
}
if resp.HpcInfoList != nil && len(resp.HpcInfoList) != 0 {
var oldHpcInfoList []coreClient.HpcInfo
for _, hpcInfo := range resp.HpcInfoList {
hpcInfo.ClusterType = "slurm"
// submit the saved task
if hpcInfo.Status == "Saved" {
switch hpcInfo.ClusterType {
case "ac":
for _, acClient := range AcClients {
clientInfo, _ := acClient.GetClientInfo()
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
if hpcInfo.ClusterId == int64(clientClusterId) {
jober, _ := acClient.Job(ac.JobOptions{})
submitReq := ac.SubmitJobReq{
StrJobManagerID: 1637920656,
MapAppJobInfo: &ac.MapAppJobInfo{
GapCmdFile: hpcInfo.CmdScript,
GapNnode: hpcInfo.NNode,
GapSubmitType: hpcInfo.SubmitType,
GapJobName: hpcInfo.Name,
GapWorkDir: hpcInfo.WorkDir,
GapQueue: hpcInfo.Queue,
GapWallTime: hpcInfo.WallTime,
GapAppType: hpcInfo.AppType,
GapNdcu: strconv.Itoa(1),
GapAppName: hpcInfo.AppName,
GapStdOutFile: hpcInfo.StdOutFile,
GapStdErrFile: hpcInfo.StdErrFile,
},
}
respAC, err := jober.SubmitJob(submitReq)
if err != nil {
log.Print(err)
continue
}
hpcInfo.JobId = respAC.Data
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
hpcInfo.Status = "Running"
}
}
case "slurm":
for _, slurmClient := range SlurmClients {
clientInfo, _ := slurmClient.GetClientInfo()
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
if hpcInfo.ClusterId == int64(clientClusterId) {
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
hpcInfo.Status = "Running"
}
}
case "paratera":
for _, parateraClient := range ParateraClients {
clientInfo, _ := parateraClient.GetClientInfo()
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
if hpcInfo.ClusterId == int64(clientClusterId) {
jober, _ := parateraClient.Job(paratera.JobOptions{})
sbs := paratera.Sbs{
JobGroupName: hpcInfo.AppName,
JobName: hpcInfo.Name,
SubmitProfile: paratera.SubmitProfile{
BootScript: hpcInfo.CmdScript,
WorkingDir: hpcInfo.WorkDir,
},
}
sbsResp, err := jober.SubmitSbsJob(sbs)
if err != nil {
log.Print(err)
continue
}
hpcInfo.JobId = sbsResp.Jid
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
hpcInfo.Status = "Running"
}
}
}
} else { //if state is not "Saved" ,then get state from participant and sync to core
switch hpcInfo.ClusterType {
case "ac":
for _, acClient := range AcClients {
clientInfo, _ := acClient.GetClientInfo()
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
if hpcInfo.ClusterId == int64(clientClusterId) {
jober, _ := acClient.Job(ac.JobOptions{})
resp, err := jober.GetJob(ac.GetJobReq{JobId: hpcInfo.JobId})
if err != nil {
log.Print(err)
continue
}
hpcInfo.Status = resp.Data.List[0].JobStatus
}
}
case "slurm":
for _, slurmClient := range SlurmClients {
clientInfo, _ := slurmClient.GetClientInfo()
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
if hpcInfo.ClusterId == int64(clientClusterId) {
jober, _ := slurmClient.Job(slurm.JobOptions{})
resp, err := jober.GetJob(slurm.GetJobReq{JobId: hpcInfo.JobId})
if err != nil {
log.Print(err)
continue
}
if len(resp.Jobs) == 0 {
log.Print("no such task exist in slurm")
hpcInfo.Status = "COMPLETED"
continue
}
hpcInfo.Status = resp.Jobs[0].JobState
}
}
case "paratera":
for _, parateraClient := range ParateraClients {
clientInfo, _ := parateraClient.GetClientInfo()
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
if hpcInfo.ClusterId == int64(clientClusterId) {
jober, _ := parateraClient.Job(paratera.JobOptions{})
resp, err := jober.GetJob(paratera.GetJobReq{JobId: hpcInfo.JobId})
if err != nil {
log.Print(err)
continue
}
hpcInfo.Status = resp.Status
}
}
}
}
}
// push submitted mark to coordinator
PushReq := coreClient.PushTaskInfoReq{
AdapterId: AdapterId,
HpcInfoList: resp.HpcInfoList,
}
if len(PushReq.HpcInfoList) != 0 {
url := CoreUrl + "/pcm/v1/core/pushTaskInfo"
method := "POST"
jsonStr, _ := json.Marshal(PushReq)
payload := strings.NewReader(string(jsonStr))
client := &http.Client{}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
_, err := client.Do(req)
if err != nil {
return
}
}
}
}