196 lines
5.7 KiB
Go
196 lines
5.7 KiB
Go
package cron
|
|
|
|
import (
|
|
"encoding/json"
|
|
"github.com/go-resty/resty/v2"
|
|
"github.com/rs/zerolog/log"
|
|
coreClient "gitlink.org.cn/JointCloud/pcm-coordinator/client"
|
|
"gitlink.org.cn/JointCloud/pcm-hpc/ac"
|
|
"gitlink.org.cn/JointCloud/pcm-hpc/paratera"
|
|
"gitlink.org.cn/JointCloud/pcm-hpc/slurm"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
var SlurmClients []slurm.Client
|
|
var ParateraClients []paratera.Client
|
|
var AcClients []ac.Client
|
|
var AdapterId int64
|
|
var CoreUrl string
|
|
|
|
func PullTaskInfo() {
|
|
//pull task from coordinator
|
|
|
|
httpClient := resty.New().R()
|
|
|
|
result, err := httpClient.SetHeader("Content-Type", "application/json").
|
|
SetQueryParam("adapterId", strconv.FormatInt(AdapterId, 10)).
|
|
Get(CoreUrl + "/pcm/v1/core/pullTaskInfo")
|
|
if err != nil {
|
|
return
|
|
}
|
|
var resp coreClient.PullTaskInfoResp
|
|
err = json.Unmarshal(result.Body(), &resp)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if resp.HpcInfoList != nil && len(resp.HpcInfoList) != 0 {
|
|
var oldHpcInfoList []coreClient.HpcInfo
|
|
|
|
for _, hpcInfo := range resp.HpcInfoList {
|
|
hpcInfo.ClusterType = "slurm"
|
|
// submit the saved task
|
|
if hpcInfo.Status == "Saved" {
|
|
switch hpcInfo.ClusterType {
|
|
case "ac":
|
|
for _, acClient := range AcClients {
|
|
clientInfo, _ := acClient.GetClientInfo()
|
|
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
|
|
if hpcInfo.ClusterId == int64(clientClusterId) {
|
|
jober, _ := acClient.Job(ac.JobOptions{})
|
|
|
|
submitReq := ac.SubmitJobReq{
|
|
StrJobManagerID: 1637920656,
|
|
MapAppJobInfo: &ac.MapAppJobInfo{
|
|
GapCmdFile: hpcInfo.CmdScript,
|
|
GapNnode: hpcInfo.NNode,
|
|
GapSubmitType: hpcInfo.SubmitType,
|
|
GapJobName: hpcInfo.Name,
|
|
GapWorkDir: hpcInfo.WorkDir,
|
|
GapQueue: hpcInfo.Queue,
|
|
GapWallTime: hpcInfo.WallTime,
|
|
GapAppType: hpcInfo.AppType,
|
|
GapNdcu: strconv.Itoa(1),
|
|
GapAppName: hpcInfo.AppName,
|
|
GapStdOutFile: hpcInfo.StdOutFile,
|
|
GapStdErrFile: hpcInfo.StdErrFile,
|
|
},
|
|
}
|
|
respAC, err := jober.SubmitJob(submitReq)
|
|
if err != nil {
|
|
log.Print(err)
|
|
continue
|
|
}
|
|
hpcInfo.JobId = respAC.Data
|
|
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
|
|
hpcInfo.Status = "Running"
|
|
}
|
|
}
|
|
|
|
case "slurm":
|
|
for _, slurmClient := range SlurmClients {
|
|
clientInfo, _ := slurmClient.GetClientInfo()
|
|
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
|
|
if hpcInfo.ClusterId == int64(clientClusterId) {
|
|
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
|
|
hpcInfo.Status = "Running"
|
|
}
|
|
}
|
|
|
|
case "paratera":
|
|
for _, parateraClient := range ParateraClients {
|
|
clientInfo, _ := parateraClient.GetClientInfo()
|
|
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
|
|
if hpcInfo.ClusterId == int64(clientClusterId) {
|
|
jober, _ := parateraClient.Job(paratera.JobOptions{})
|
|
sbs := paratera.Sbs{
|
|
JobGroupName: hpcInfo.AppName,
|
|
JobName: hpcInfo.Name,
|
|
SubmitProfile: paratera.SubmitProfile{
|
|
BootScript: hpcInfo.CmdScript,
|
|
WorkingDir: hpcInfo.WorkDir,
|
|
},
|
|
}
|
|
sbsResp, err := jober.SubmitSbsJob(sbs)
|
|
if err != nil {
|
|
log.Print(err)
|
|
continue
|
|
}
|
|
hpcInfo.JobId = sbsResp.Jid
|
|
oldHpcInfoList = append(oldHpcInfoList, *hpcInfo)
|
|
hpcInfo.Status = "Running"
|
|
}
|
|
}
|
|
|
|
}
|
|
} else { //if state is not "Saved" ,then get state from participant and sync to core
|
|
switch hpcInfo.ClusterType {
|
|
case "ac":
|
|
for _, acClient := range AcClients {
|
|
clientInfo, _ := acClient.GetClientInfo()
|
|
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
|
|
if hpcInfo.ClusterId == int64(clientClusterId) {
|
|
jober, _ := acClient.Job(ac.JobOptions{})
|
|
resp, err := jober.GetJob(ac.GetJobReq{JobId: hpcInfo.JobId})
|
|
if err != nil {
|
|
log.Print(err)
|
|
continue
|
|
}
|
|
hpcInfo.Status = resp.Data.List[0].JobStatus
|
|
}
|
|
}
|
|
|
|
case "slurm":
|
|
for _, slurmClient := range SlurmClients {
|
|
clientInfo, _ := slurmClient.GetClientInfo()
|
|
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
|
|
if hpcInfo.ClusterId == int64(clientClusterId) {
|
|
jober, _ := slurmClient.Job(slurm.JobOptions{})
|
|
resp, err := jober.GetJob(slurm.GetJobReq{JobId: hpcInfo.JobId})
|
|
if err != nil {
|
|
log.Print(err)
|
|
continue
|
|
}
|
|
if len(resp.Jobs) == 0 {
|
|
log.Print("no such task exist in slurm")
|
|
hpcInfo.Status = "COMPLETED"
|
|
continue
|
|
}
|
|
hpcInfo.Status = resp.Jobs[0].JobState
|
|
}
|
|
}
|
|
|
|
case "paratera":
|
|
for _, parateraClient := range ParateraClients {
|
|
clientInfo, _ := parateraClient.GetClientInfo()
|
|
clientClusterId, _ := strconv.Atoi(clientInfo.ClusterId)
|
|
if hpcInfo.ClusterId == int64(clientClusterId) {
|
|
jober, _ := parateraClient.Job(paratera.JobOptions{})
|
|
resp, err := jober.GetJob(paratera.GetJobReq{JobId: hpcInfo.JobId})
|
|
if err != nil {
|
|
log.Print(err)
|
|
continue
|
|
}
|
|
hpcInfo.Status = resp.Status
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// push submitted mark to coordinator
|
|
PushReq := coreClient.PushTaskInfoReq{
|
|
AdapterId: AdapterId,
|
|
HpcInfoList: resp.HpcInfoList,
|
|
}
|
|
|
|
if len(PushReq.HpcInfoList) != 0 {
|
|
url := CoreUrl + "/pcm/v1/core/pushTaskInfo"
|
|
method := "POST"
|
|
|
|
jsonStr, _ := json.Marshal(PushReq)
|
|
payload := strings.NewReader(string(jsonStr))
|
|
client := &http.Client{}
|
|
req, _ := http.NewRequest(method, url, payload)
|
|
req.Header.Add("Content-Type", "application/json")
|
|
_, err := client.Do(req)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|