diff --git a/common/assets/confs/middleware.json b/common/assets/confs/middleware.json index 3614a7d..e4a3a21 100644 --- a/common/assets/confs/middleware.json +++ b/common/assets/confs/middleware.json @@ -33,5 +33,17 @@ }, "cloudreamStorage": { "url": "http://localhost:32010" - } + }, + "pcmHub": [ + { + "name": "OpenI", + "APIKey": "", + "URL": "https://qaygilqyldcc.test.jointcloud.net:443" + }, + { + "name": "ModelArts", + "APIKey": "", + "URL": "https://fwoofrxezczr.test.jointcloud.net:443" + } + ] } \ No newline at end of file diff --git a/common/assets/confs/test.json b/common/assets/confs/test.json index c6d5f43..402c336 100644 --- a/common/assets/confs/test.json +++ b/common/assets/confs/test.json @@ -1,36 +1,15 @@ { - "inputs": [ - { - "url": "https://pcm2-bucket2.obs.cn-north-4.myhuaweicloud.com:443/cds/blocks/50/Full50B464DB2FDDC29D0380D9FFAB6D944FAF5C7624955D757939280590F01F3ECD?AWSAccessKeyId=CANMDYKXIWRDR0IYDB32&Expires=1737685829&Signature=m4IdspV4RQcJgm4Ls9%2Fosaxr2o4%3D" + "userID": 1, + "packageID": 1126, + "uploadParams": { + "dataType": "dataset", + "uploadInfo": { + "type": "url", + "url": "https://openi.pcl.ac.cn/somunslotus/meta-weight-net/grampus/train-job/501239", + "targetClusters": [ + "1865927992266461184" + ] }, - { - "url": "https://pcm2-bucket2.obs.cn-north-4.myhuaweicloud.com:443/cds/blocks/C0/FullC036CBB7553A909F8B8877D4461924307F27ECB66CFF928EEEAFD569C3887E29?AWSAccessKeyId=CANMDYKXIWRDR0IYDB32&Expires=1737685829&Signature=pQTrQSPUw3k4h4Q6yfZp3a8wQ6M%3D" - }, - { - "url": "https://pcm2-bucket2.obs.cn-north-4.myhuaweicloud.com:443/cds/blocks/54/Full543F38D9F524238AC0239263AA0DD4B4328763818EA98A7A5F72E59748FDA27A?AWSAccessKeyId=CANMDYKXIWRDR0IYDB32&Expires=1737685829&Signature=voCaGtEJ6z%2Bp8TWl8hJw4ytatnE%3D" - } - ], - "outputs": [ - "/tmp/IGsnvEIyru_0", - "/tmp/IGsnvEIyru_1", - "/tmp/IGsnvEIyru_2" - ], - "coefs": [ - [ - 186, - 2, - 185 - ], - [ - 186, - 3, - 184 - ], - [ - 1, - 1, - 1 - ] - ], - "chunkSize": 5242880 + "dataName": "501239" + } } \ No newline at end of file diff --git a/common/pkgs/db/access.go b/common/pkgs/db/access.go index a43b65f..7f63ce1 100644 --- a/common/pkgs/db/access.go +++ b/common/pkgs/db/access.go @@ -149,7 +149,7 @@ func (db *AccessDB) GetUserBySsoID(ctx SQLContext, id string) (cdssdk.UserID, er return ret, nil } -func (db *AccessDB) AddUser(ctx SQLContext, user sch.User) error { +func (db *AccessDB) AddUser(ctx SQLContext, user sch.User, buckets []sch.Bucket) error { // 根据ssoID查询数据是否存在 var existUser sch.User err := ctx.Table("users").Where("sso_id = ?", user.SsoID).Find(&existUser).Error @@ -160,9 +160,33 @@ func (db *AccessDB) AddUser(ctx SQLContext, user sch.User) error { return fmt.Errorf("user already exists") } - return ctx.Table("users").Create(&user).Error + tx := ctx.Begin() + err = tx.Table("users").Create(&user).Error + if err != nil { + tx.Rollback() + return err + } + + for _, bucket := range buckets { + err = tx.Table("bucket").Create(&bucket).Error + if err != nil { + tx.Rollback() + return err + } + } + tx.Commit() + + return nil } func (db *AccessDB) UpdateUser(ctx SQLContext, id string, username string) error { return ctx.Table("users").Where("sso_id = ?", id).Update("username", username).Error } + +func (db *AccessDB) GetBucketByUserID(ctx SQLContext, userID cdssdk.UserID, dataType string) (*sch.Bucket, error) { + var ret sch.Bucket + if err := ctx.Table("bucket").Where("user_id = ? and data_type = ?", userID, dataType).First(&ret).Error; err != nil { + return &ret, err + } + return &ret, nil +} diff --git a/log/schedulerclient.log b/log/schedulerclient.log index 6716478..9bad8d1 100644 --- a/log/schedulerclient.log +++ b/log/schedulerclient.log @@ -2125,3 +2125,74 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the 2025-01-23 16:42:28 [DEBU] submitting job 2025-01-25 08:01:27 [INFO] start serving http at: :7891 2025-01-25 08:02:56 [INFO] start serving http at: :7891 +2025-02-10 10:44:47 [INFO] start serving http at: :7891 +2025-02-10 10:49:31 [INFO] start serving http at: :7891 +2025-02-10 10:51:33 [INFO] start serving http at: :7891 +2025-02-10 10:53:18 [INFO] start serving http at: :7891 +2025-02-10 10:57:22 [WARN] [HTTP:JobSet.QueryUploaded] getting service list: failed to query uploaded data: Get "http://localhost:32010/object/listByPath?isPrefix=true&packageID=1093&path=&userID=1": dial tcp [::1]:32010: connectex: No connection could be made because the target machine actively refused it. +2025-02-10 11:13:36 [INFO] start serving http at: :7891 +2025-02-10 11:28:40 [INFO] start serving http at: :7891 +2025-02-10 11:30:25 [INFO] start serving http at: :7891 +2025-02-10 11:35:21 [INFO] start serving http at: :7891 +2025-02-10 14:51:59 [INFO] start serving http at: :7891 +2025-02-10 14:54:33 [INFO] start serving http at: :7891 +2025-02-10 14:56:11 [INFO] start serving http at: :7891 +2025-02-10 14:57:00 [INFO] start serving http at: :7891 +2025-02-10 15:54:36 [INFO] start serving http at: :7891 +2025-02-10 15:57:16 [INFO] start serving http at: :7891 +2025-02-10 15:58:50 [WARN] get access requests: Error 1364 (HY000): Field 'password' doesn't have a default value +2025-02-10 16:00:04 [INFO] start serving http at: :7891 +2025-02-10 16:02:07 [INFO] start serving http at: :7891 +2025-02-10 16:03:47 [INFO] start serving http at: :7891 +2025-02-10 16:03:56 [WARN] get access requests: user already exists +2025-02-11 09:59:01 [INFO] start serving http at: :7891 +2025-02-11 09:59:48 [DEBU] submitting job +2025-02-11 10:02:09 [INFO] start serving http at: :7891 +2025-02-11 10:02:17 [DEBU] submitting job +2025-02-11 10:04:34 [INFO] start serving http at: :7891 +2025-02-11 10:04:36 [DEBU] submitting job +2025-02-11 10:35:11 [INFO] start serving http at: :7891 +2025-02-11 10:35:20 [DEBU] submitting job +2025-02-11 10:42:38 [INFO] start serving http at: :7891 +2025-02-11 10:42:49 [DEBU] submitting job +2025-02-11 10:43:31 [ERRO] create task: Post "https://comnet.jointcloud.net/pcm/v1/schedule/createTask": dial tcp: lookup comnet.jointcloud.net: no such host +2025-02-11 10:43:31 [INFO] jobID: %s change state from %s to %s0&{0xc0000ca8f0 1} &{0xc000324d00} +2025-02-11 10:43:31 [INFO] [JobID:0] state changed: *state2.PCMJobCreate -> *state.Completed +2025-02-11 10:43:31 [INFO] [JobID:0] [LastState:*state2.PCMJobCreate] job failed with: create task: Post "https://comnet.jointcloud.net/pcm/v1/schedule/createTask": dial tcp: lookup comnet.jointcloud.net: no such host +2025-02-11 10:43:31 [INFO] job set 0 completed +2025-02-11 10:43:57 [DEBU] submitting job +2025-02-11 10:44:26 [INFO] jobID: %s change state from %s to %s1&{0xc0000cb340 1} &{1 1889139808467423233 [{model 1070 EFILE [1777240145309732864]}] } +2025-02-11 10:44:26 [INFO] [JobID:1] state changed: *state2.PCMJobCreate -> *state2.DataSchedule +2025-02-11 10:44:31 [ERRO] schedule data: code: 500, message: +2025-02-11 10:44:31 [INFO] jobID: %s change state from %s to %s1&{1 1889139808467423233 [{model 1070 EFILE [1777240145309732864]}] } &{1889139808467423233 schedule data: code: 500, message: } +2025-02-11 10:44:31 [INFO] [JobID:1] state changed: *state2.DataSchedule -> *state2.PCMJobCancel +2025-02-11 10:44:31 [ERRO] unknow response content type: +2025-02-11 10:44:31 [INFO] jobID: %s change state from %s to %s1&{1889139808467423233 schedule data: code: 500, message: } &{0xc00023a010} +2025-02-11 10:44:31 [INFO] [JobID:1] state changed: *state2.PCMJobCancel -> *state.Completed +2025-02-11 10:44:31 [INFO] [JobID:1] [LastState:*state2.PCMJobCancel] job failed with: unknow response content type: +2025-02-11 10:44:31 [INFO] job set 1 completed +2025-02-12 15:15:26 [INFO] start serving http at: :7891 +2025-02-12 15:21:02 [WARN] get access requests: failed to create user: unknow response content type: text/plain, status: 404, body(prefix): 404 page not found +2025-02-12 15:21:30 [WARN] get access requests: failed to create user: unknow response content type: text/plain, status: 404, body(prefix): 404 page not found +2025-02-12 15:21:57 [WARN] get access requests: failed to create user: unknow response content type: text/plain, status: 404, body(prefix): 404 page not found +2025-02-12 15:23:12 [WARN] get access requests: failed to create user: unknow response content type: text/plain, status: 404, body(prefix): 404 page not found +2025-02-12 15:57:38 [INFO] start serving http at: :7891 +2025-02-12 16:29:40 [INFO] start serving http at: :7891 +2025-02-12 16:42:18 [INFO] start serving http at: :7891 +2025-02-12 16:44:50 [DEBU] uploading job +2025-02-12 16:49:42 [INFO] start serving http at: :7891 +2025-02-12 16:52:19 [DEBU] uploading job +2025-02-12 16:52:23 [INFO] start serving http at: :7891 +2025-02-12 16:52:45 [DEBU] uploading job +2025-02-12 16:54:10 [ERRO] upload data: code: 500, message: +2025-02-12 16:54:10 [INFO] jobID: %s change state from %s to %s0&{1 0xc0005b9ae0 code {1 0}} &{0xc0001fa0a0} +2025-02-12 16:54:10 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-12 16:54:10 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-12 16:54:10 [INFO] job set 0 completed +2025-02-12 16:54:18 [INFO] start serving http at: :7891 +2025-02-12 16:54:51 [DEBU] uploading job +2025-02-12 16:56:48 [ERRO] upload data: code: 500, message: +2025-02-12 16:56:48 [INFO] jobID: %s change state from %s to %s0&{1 0xc0002480a0 code {1 0}} &{0xc00067c700} +2025-02-12 16:56:48 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-12 16:56:48 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-12 16:56:48 [INFO] job set 0 completed diff --git a/magefiles/mage_output_file.go b/magefiles/mage_output_file.go new file mode 100644 index 0000000..43195a1 --- /dev/null +++ b/magefiles/mage_output_file.go @@ -0,0 +1,727 @@ +// +build ignore + +package main + +import ( + "context" + _flag "flag" + _fmt "fmt" + _ioutil "io/ioutil" + _log "log" + "os" + "os/signal" + _filepath "path/filepath" + _sort "sort" + "strconv" + _strings "strings" + "syscall" + _tabwriter "text/tabwriter" + "time" + +) + +func main() { + // Use local types and functions in order to avoid name conflicts with additional magefiles. + type arguments struct { + Verbose bool // print out log statements + List bool // print out a list of targets + Help bool // print out help for a specific target + Timeout time.Duration // set a timeout to running the targets + Args []string // args contain the non-flag command-line arguments + } + + parseBool := func(env string) bool { + val := os.Getenv(env) + if val == "" { + return false + } + b, err := strconv.ParseBool(val) + if err != nil { + _log.Printf("warning: environment variable %s is not a valid bool value: %v", env, val) + return false + } + return b + } + + parseDuration := func(env string) time.Duration { + val := os.Getenv(env) + if val == "" { + return 0 + } + d, err := time.ParseDuration(val) + if err != nil { + _log.Printf("warning: environment variable %s is not a valid duration value: %v", env, val) + return 0 + } + return d + } + args := arguments{} + fs := _flag.FlagSet{} + fs.SetOutput(os.Stdout) + + // default flag set with ExitOnError and auto generated PrintDefaults should be sufficient + fs.BoolVar(&args.Verbose, "v", parseBool("MAGEFILE_VERBOSE"), "show verbose output when running targets") + fs.BoolVar(&args.List, "l", parseBool("MAGEFILE_LIST"), "list targets for this binary") + fs.BoolVar(&args.Help, "h", parseBool("MAGEFILE_HELP"), "print out help for a specific target") + fs.DurationVar(&args.Timeout, "t", parseDuration("MAGEFILE_TIMEOUT"), "timeout in duration parsable format (e.g. 5m30s)") + fs.Usage = func() { + _fmt.Fprintf(os.Stdout, ` +%s [options] [target] + +Commands: + -l list targets in this binary + -h show this help + +Options: + -h show description of a target + -t + timeout in duration parsable format (e.g. 5m30s) + -v show verbose output when running targets + `[1:], _filepath.Base(os.Args[0])) + } + if err := fs.Parse(os.Args[1:]); err != nil { + // flag will have printed out an error already. + return + } + args.Args = fs.Args() + if args.Help && len(args.Args) == 0 { + fs.Usage() + return + } + + // color is ANSI color type + type color int + + // If you add/change/remove any items in this constant, + // you will need to run "stringer -type=color" in this directory again. + // NOTE: Please keep the list in an alphabetical order. + const ( + black color = iota + red + green + yellow + blue + magenta + cyan + white + brightblack + brightred + brightgreen + brightyellow + brightblue + brightmagenta + brightcyan + brightwhite + ) + + // AnsiColor are ANSI color codes for supported terminal colors. + var ansiColor = map[color]string{ + black: "\u001b[30m", + red: "\u001b[31m", + green: "\u001b[32m", + yellow: "\u001b[33m", + blue: "\u001b[34m", + magenta: "\u001b[35m", + cyan: "\u001b[36m", + white: "\u001b[37m", + brightblack: "\u001b[30;1m", + brightred: "\u001b[31;1m", + brightgreen: "\u001b[32;1m", + brightyellow: "\u001b[33;1m", + brightblue: "\u001b[34;1m", + brightmagenta: "\u001b[35;1m", + brightcyan: "\u001b[36;1m", + brightwhite: "\u001b[37;1m", + } + + const _color_name = "blackredgreenyellowbluemagentacyanwhitebrightblackbrightredbrightgreenbrightyellowbrightbluebrightmagentabrightcyanbrightwhite" + + var _color_index = [...]uint8{0, 5, 8, 13, 19, 23, 30, 34, 39, 50, 59, 70, 82, 92, 105, 115, 126} + + colorToLowerString := func (i color) string { + if i < 0 || i >= color(len(_color_index)-1) { + return "color(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _color_name[_color_index[i]:_color_index[i+1]] + } + + // ansiColorReset is an ANSI color code to reset the terminal color. + const ansiColorReset = "\033[0m" + + // defaultTargetAnsiColor is a default ANSI color for colorizing targets. + // It is set to Cyan as an arbitrary color, because it has a neutral meaning + var defaultTargetAnsiColor = ansiColor[cyan] + + getAnsiColor := func(color string) (string, bool) { + colorLower := _strings.ToLower(color) + for k, v := range ansiColor { + colorConstLower := colorToLowerString(k) + if colorConstLower == colorLower { + return v, true + } + } + return "", false + } + + // Terminals which don't support color: + // TERM=vt100 + // TERM=cygwin + // TERM=xterm-mono + var noColorTerms = map[string]bool{ + "vt100": false, + "cygwin": false, + "xterm-mono": false, + } + + // terminalSupportsColor checks if the current console supports color output + // + // Supported: + // linux, mac, or windows's ConEmu, Cmder, putty, git-bash.exe, pwsh.exe + // Not supported: + // windows cmd.exe, powerShell.exe + terminalSupportsColor := func() bool { + envTerm := os.Getenv("TERM") + if _, ok := noColorTerms[envTerm]; ok { + return false + } + return true + } + + // enableColor reports whether the user has requested to enable a color output. + enableColor := func() bool { + b, _ := strconv.ParseBool(os.Getenv("MAGEFILE_ENABLE_COLOR")) + return b + } + + // targetColor returns the ANSI color which should be used to colorize targets. + targetColor := func() string { + s, exists := os.LookupEnv("MAGEFILE_TARGET_COLOR") + if exists == true { + if c, ok := getAnsiColor(s); ok == true { + return c + } + } + return defaultTargetAnsiColor + } + + // store the color terminal variables, so that the detection isn't repeated for each target + var enableColorValue = enableColor() && terminalSupportsColor() + var targetColorValue = targetColor() + + printName := func(str string) string { + if enableColorValue { + return _fmt.Sprintf("%s%s%s", targetColorValue, str, ansiColorReset) + } else { + return str + } + } + + list := func() error { + + targets := map[string]string{ + "amD64": "[配置项]设置编译架构为amd64", + "advisor": "", + "all": "", + "bin": "", + "client": "", + "collector": "", + "confs": "", + "executor": "", + "linux": "[配置项]设置编译平台为linux", + "manager": "", + "scheduleMiddleware": "", + "scripts": "", + "win": "[配置项]设置编译平台为windows", + } + + keys := make([]string, 0, len(targets)) + for name := range targets { + keys = append(keys, name) + } + _sort.Strings(keys) + + _fmt.Println("Targets:") + w := _tabwriter.NewWriter(os.Stdout, 0, 4, 4, ' ', 0) + for _, name := range keys { + _fmt.Fprintf(w, " %v\t%v\n", printName(name), targets[name]) + } + err := w.Flush() + return err + } + + var ctx context.Context + ctxCancel := func(){} + + // by deferring in a closure, we let the cancel function get replaced + // by the getContext function. + defer func() { + ctxCancel() + }() + + getContext := func() (context.Context, func()) { + if ctx == nil { + if args.Timeout != 0 { + ctx, ctxCancel = context.WithTimeout(context.Background(), args.Timeout) + } else { + ctx, ctxCancel = context.WithCancel(context.Background()) + } + } + + return ctx, ctxCancel + } + + runTarget := func(logger *_log.Logger, fn func(context.Context) error) interface{} { + var err interface{} + ctx, cancel := getContext() + d := make(chan interface{}) + go func() { + defer func() { + err := recover() + d <- err + }() + err := fn(ctx) + d <- err + }() + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT) + select { + case <-sigCh: + logger.Println("cancelling mage targets, waiting up to 5 seconds for cleanup...") + cancel() + cleanupCh := time.After(5 * time.Second) + + select { + // target exited by itself + case err = <-d: + return err + // cleanup timeout exceeded + case <-cleanupCh: + return _fmt.Errorf("cleanup timeout exceeded") + // second SIGINT received + case <-sigCh: + logger.Println("exiting mage") + return _fmt.Errorf("exit forced") + } + case <-ctx.Done(): + cancel() + e := ctx.Err() + _fmt.Printf("ctx err: %v\n", e) + return e + case err = <-d: + // we intentionally don't cancel the context here, because + // the next target will need to run with the same context. + return err + } + } + // This is necessary in case there aren't any targets, to avoid an unused + // variable error. + _ = runTarget + + handleError := func(logger *_log.Logger, err interface{}) { + if err != nil { + logger.Printf("Error: %+v\n", err) + type code interface { + ExitStatus() int + } + if c, ok := err.(code); ok { + os.Exit(c.ExitStatus()) + } + os.Exit(1) + } + } + _ = handleError + + // Set MAGEFILE_VERBOSE so mg.Verbose() reflects the flag value. + if args.Verbose { + os.Setenv("MAGEFILE_VERBOSE", "1") + } else { + os.Setenv("MAGEFILE_VERBOSE", "0") + } + + _log.SetFlags(0) + if !args.Verbose { + _log.SetOutput(_ioutil.Discard) + } + logger := _log.New(os.Stderr, "", 0) + if args.List { + if err := list(); err != nil { + _log.Println(err) + os.Exit(1) + } + return + } + + if args.Help { + if len(args.Args) < 1 { + logger.Println("no target specified") + os.Exit(2) + } + switch _strings.ToLower(args.Args[0]) { + case "amd64": + _fmt.Println("[配置项]设置编译架构为amd64") + _fmt.Println() + + _fmt.Print("Usage:\n\n\tmage amd64\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "advisor": + + _fmt.Print("Usage:\n\n\tmage advisor\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "all": + + _fmt.Print("Usage:\n\n\tmage all\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "bin": + + _fmt.Print("Usage:\n\n\tmage bin\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "client": + + _fmt.Print("Usage:\n\n\tmage client\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "collector": + + _fmt.Print("Usage:\n\n\tmage collector\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "confs": + + _fmt.Print("Usage:\n\n\tmage confs\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "executor": + + _fmt.Print("Usage:\n\n\tmage executor\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "linux": + _fmt.Println("[配置项]设置编译平台为linux") + _fmt.Println() + + _fmt.Print("Usage:\n\n\tmage linux\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "manager": + + _fmt.Print("Usage:\n\n\tmage manager\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "schedulemiddleware": + + _fmt.Print("Usage:\n\n\tmage schedulemiddleware\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "scripts": + + _fmt.Print("Usage:\n\n\tmage scripts\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + case "win": + _fmt.Println("[配置项]设置编译平台为windows") + _fmt.Println() + + _fmt.Print("Usage:\n\n\tmage win\n\n") + var aliases []string + if len(aliases) > 0 { + _fmt.Printf("Aliases: %s\n\n", _strings.Join(aliases, ", ")) + } + return + default: + logger.Printf("Unknown target: %q\n", args.Args[0]) + os.Exit(2) + } + } + if len(args.Args) < 1 { + if err := list(); err != nil { + logger.Println("Error:", err) + os.Exit(1) + } + return + } + for x := 0; x < len(args.Args); { + target := args.Args[x] + x++ + + // resolve aliases + switch _strings.ToLower(target) { + + } + + switch _strings.ToLower(target) { + + case "amd64": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"AMD64\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "AMD64") + } + + wrapFn := func(ctx context.Context) error { + AMD64() + return nil + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "advisor": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Advisor\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Advisor") + } + + wrapFn := func(ctx context.Context) error { + return Advisor() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "all": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"All\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "All") + } + + wrapFn := func(ctx context.Context) error { + return All() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "bin": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Bin\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Bin") + } + + wrapFn := func(ctx context.Context) error { + return Bin() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "client": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Client\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Client") + } + + wrapFn := func(ctx context.Context) error { + return Client() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "collector": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Collector\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Collector") + } + + wrapFn := func(ctx context.Context) error { + return Collector() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "confs": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Confs\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Confs") + } + + wrapFn := func(ctx context.Context) error { + return Confs() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "executor": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Executor\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Executor") + } + + wrapFn := func(ctx context.Context) error { + return Executor() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "linux": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Linux\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Linux") + } + + wrapFn := func(ctx context.Context) error { + Linux() + return nil + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "manager": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Manager\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Manager") + } + + wrapFn := func(ctx context.Context) error { + return Manager() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "schedulemiddleware": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"ScheduleMiddleware\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "ScheduleMiddleware") + } + + wrapFn := func(ctx context.Context) error { + return ScheduleMiddleware() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "scripts": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Scripts\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Scripts") + } + + wrapFn := func(ctx context.Context) error { + return Scripts() + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + case "win": + expected := x + 0 + if expected > len(args.Args) { + // note that expected and args at this point include the arg for the target itself + // so we subtract 1 here to show the number of args without the target. + logger.Printf("not enough arguments for target \"Win\", expected %v, got %v\n", expected-1, len(args.Args)-1) + os.Exit(2) + } + if args.Verbose { + logger.Println("Running target:", "Win") + } + + wrapFn := func(ctx context.Context) error { + Win() + return nil + } + ret := runTarget(logger, wrapFn) + handleError(logger, ret) + + default: + logger.Printf("Unknown target specified: %q\n", target) + os.Exit(2) + } + } +} + + + + diff --git a/schedulerMiddleware/internal/config/config.go b/schedulerMiddleware/internal/config/config.go index dfa823b..60c4e79 100644 --- a/schedulerMiddleware/internal/config/config.go +++ b/schedulerMiddleware/internal/config/config.go @@ -1,6 +1,7 @@ package config import ( + pcmhub "gitlink.org.cn/JointCloud/pcm-hub/config" "gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/sdks/blockchain" sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" @@ -11,12 +12,13 @@ import ( ) type Config struct { - Logger logger.Config `json:"logger"` - DB db.Config `json:"db"` - PCMScheduler sch.Config `json:"pcmScheduler"` - Uploader uploadersdk.Config `json:"uploader"` - BlockChain blockchain.Config `json:"blockChain"` - CloudreamStorage cdsapi.Config `json:"cloudreamStorage"` + Logger logger.Config `json:"logger"` + DB db.Config `json:"db"` + PCMScheduler sch.Config `json:"pcmScheduler"` + Uploader uploadersdk.Config `json:"uploader"` + BlockChain blockchain.Config `json:"blockChain"` + CloudreamStorage cdsapi.Config `json:"cloudreamStorage"` + PCMHub []pcmhub.PlatformConfig `json:"pcmHub"` } var cfg Config diff --git a/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go b/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go index 61fad43..29bcb6b 100644 --- a/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go +++ b/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go @@ -72,9 +72,11 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error { req := uploadersdk.UploadReq{ DataType: s.dataType, Source: &uploadersdk.UrlSource{ - Url: info.Url, + Type: sch.StorageTypeURL, + Url: info.Url, }, Target: &uploadersdk.UrlTarget{ + Type: sch.StorageTypeURL, ClusterID: uploadersdk.ClusterID(info.Cluster), JCSUploadInfo: cdsapi.ObjectUploadInfo{ UserID: s.userID, diff --git a/schedulerMiddleware/internal/manager/jobmgr/job/state2/job_create.go b/schedulerMiddleware/internal/manager/jobmgr/job/state2/job_create.go index 994722e..02e38a1 100644 --- a/schedulerMiddleware/internal/manager/jobmgr/job/state2/job_create.go +++ b/schedulerMiddleware/internal/manager/jobmgr/job/state2/job_create.go @@ -111,8 +111,8 @@ func (s *PCMJobCreate) do(rtx jobmgr.JobStateRunContext) (*sch.CreateJobResp, er } req := sch.CreateJobReq{ - //Name: s.jobInfo.Name, - //Description: s.jobInfo.Description, + Name: s.jobInfo.Name, + Description: s.jobInfo.Description, DataDistribute: dataDistribute, JobResources: s.jobInfo.JobResources, } diff --git a/schedulerMiddleware/internal/services/jobset.go b/schedulerMiddleware/internal/services/jobset.go index 34ead58..a97ffba 100644 --- a/schedulerMiddleware/internal/services/jobset.go +++ b/schedulerMiddleware/internal/services/jobset.go @@ -4,6 +4,9 @@ import ( "encoding/json" "errors" "fmt" + "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/algorithm" + "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/dataset" + "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/model" "sort" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -432,33 +435,90 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI switch bd := info.(type) { case *sch.DatasetBinding: - jsonData, err := json.Marshal(bd) + packages, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageIDs, []int64{-2}) if err != nil { return err } - bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData)) + if len(packages) == 0 { + return fmt.Errorf("no package found") + } + + filePath := sch.Split + sch.DATASET + sch.Split + packages[0].PackageName + ds := &dataset.Dataset{Name: bd.Name, Description: bd.Description, FilePath: filePath, Category: dataset.CommonValue(bd.Category)} + resp, err := svc.hubClient.BindDataset("ModelArts", ds) + if err != nil { + return err + } + jsonData, err := json.Marshal(resp.Data) + if err != nil { + return err + } + + content, err := json.Marshal(bd) + if err != nil { + return err + } + bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData)) packageIDs = bd.PackageIDs case *sch.CodeBinding: - isCode = true - jsonData, err := json.Marshal(bd) + packages, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), []cdssdk.PackageID{bd.PackageID}, []int64{-2}) if err != nil { return err } - bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData)) + if len(packages) == 0 { + return fmt.Errorf("no package found") + } + + filePath := sch.Split + sch.CODE + sch.Split + packages[0].PackageName + code := &algorithm.Algorithm{Name: bd.Name, Description: bd.Description, Engine: "TensorFlow", CodeDir: filePath, BootFile: filePath + sch.Split + bd.FilePath, Branch: "main"} + resp, err := svc.hubClient.BindAlgorithm("ModelArts", code) + if err != nil { + return err + } + jsonData, err := json.Marshal(resp.Data) + if err != nil { + return err + } + + isCode = true + content, err := json.Marshal(bd) + if err != nil { + return err + } + bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData)) packageIDs = []cdssdk.PackageID{bd.PackageID} case *sch.ImageBinding: - jsonData, err := json.Marshal(bd) + content, err := json.Marshal(bd) if err != nil { return err } - bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData)) + bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), "") packageIDs = bd.PackageIDs case *sch.ModelBinding: - jsonData, err := json.Marshal(bd) + packages, err := svc.db.UploadData().GetByPackageID(svc.db.DefCtx(), bd.PackageIDs, []int64{-2}) if err != nil { return err } - bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(jsonData)) + if len(packages) == 0 { + return fmt.Errorf("no package found") + } + + filePath := sch.Split + sch.MODEL + sch.Split + packages[0].PackageName + md := &model.Model{Name: bd.Name, Description: bd.Description, Type: bd.Category, Version: bd.Version, Engine: model.CommonValue(bd.Env), FilePath: filePath} + resp, err := svc.hubClient.BindModel("ModelArts", md) + if err != nil { + return err + } + jsonData, err := json.Marshal(resp.Data) + if err != nil { + return err + } + + content, err := json.Marshal(bd) + if err != nil { + return err + } + bindingData = getBindingData(id, userID, bd.Type, bd.Name, string(content), string(jsonData)) packageIDs = bd.PackageIDs } @@ -492,13 +552,14 @@ func (svc *JobSetService) DataBinding(id uploadersdk.DataID, userID cdssdk.UserI return nil } -func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string) uploadersdk.Binding { +func getBindingData(id uploadersdk.DataID, userID cdssdk.UserID, dataType string, name string, content string, jsonData string) uploadersdk.Binding { bindingData := uploadersdk.Binding{ ID: id, Name: name, DataType: dataType, UserID: userID, Content: content, + JsonData: jsonData, } return bindingData } @@ -705,12 +766,17 @@ func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataT } defer schglb.CloudreamStoragePool.Release(cdsCli) - bucketID := cdssdk.BucketID(1) + bucket, err := svc.db.Access().GetBucketByUserID(svc.db.DefCtx(), userID, dataType) + if err != nil { + return fmt.Errorf("failed to get bucket: %w", err) + } else if bucket == nil { + return fmt.Errorf("bucket not found") + } // 创建package newPackage, err := cdsCli.Package().Create(cdsapi.PackageCreate{ UserID: userID, - BucketID: bucketID, + BucketID: bucket.ID, Name: name, }) if err != nil { @@ -721,7 +787,7 @@ func (svc *JobSetService) CreatePackage(userID cdssdk.UserID, name string, dataT UserID: userID, PackageID: newPackage.Package.PackageID, PackageName: name, - BucketID: bucketID, + BucketID: bucket.ID, DataType: dataType, UploadPriority: uploadPriority, } diff --git a/schedulerMiddleware/internal/services/service.go b/schedulerMiddleware/internal/services/service.go index 668aea1..7c2ed34 100644 --- a/schedulerMiddleware/internal/services/service.go +++ b/schedulerMiddleware/internal/services/service.go @@ -1,6 +1,7 @@ package services import ( + hub "gitlink.org.cn/JointCloud/pcm-hub/client" "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler2" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr" @@ -10,12 +11,14 @@ type Service struct { preScheduler prescheduler2.PreScheduler jobMgr *jobmgr.Manager db *db.DB + hubClient *hub.Client } -func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB) (*Service, error) { +func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB, hubClient *hub.Client) (*Service, error) { return &Service{ preScheduler: preScheduler, jobMgr: jobMgr, db: db, + hubClient: hubClient, }, nil } diff --git a/schedulerMiddleware/internal/services/user.go b/schedulerMiddleware/internal/services/user.go index 2de69f5..80a393c 100644 --- a/schedulerMiddleware/internal/services/user.go +++ b/schedulerMiddleware/internal/services/user.go @@ -1,8 +1,12 @@ package services import ( + "fmt" sch "gitlink.org.cn/cloudream/common/sdks/pcmscheduler" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" + "gitlink.org.cn/cloudream/common/sdks/storage/cdsapi" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + "strconv" "time" ) @@ -24,13 +28,51 @@ func (svc *UserService) QueryUser(id string) (cdssdk.UserID, error) { } func (svc *UserService) AddUser(id string, userName string) error { + cdsCli, err := schglb.CloudreamStoragePool.Acquire() + if err != nil { + return fmt.Errorf("new scheduler client: %w", err) + } + defer schglb.CloudreamStoragePool.Release(cdsCli) + + // 注册用户 + createReq := &cdsapi.UserCreate{ + Name: userName, + } + createResp, err := cdsCli.UserCreate(createReq) + if err != nil { + return fmt.Errorf("failed to create user: %w", err) + } + user := sch.User{ + ID: int64(createResp.User.UserID), SsoID: id, UserName: userName, Created: time.Now(), } - err := svc.db.Access().AddUser(svc.db.DefCtx(), user) + // 根据不同的数据类型,创建不同的桶 + var buckets []sch.Bucket + dataTypes := []string{sch.CODE, sch.DATASET, sch.MODEL, sch.IMAGE} + for _, dataType := range dataTypes { + bucketName := strconv.FormatInt(int64(createResp.User.UserID), 10) + "_" + dataType + bucketCreateReq := cdsapi.BucketCreate{ + Name: bucketName, + UserID: createResp.User.UserID, + } + + bucketCreateResp, err := cdsCli.Bucket().Create(bucketCreateReq) + if err != nil { + return err + } + + buckets = append(buckets, sch.Bucket{ + ID: bucketCreateResp.Bucket.BucketID, + UserID: createResp.User.UserID, + DataType: dataType, + }) + } + + err = svc.db.Access().AddUser(svc.db.DefCtx(), user, buckets) if err != nil { return err } diff --git a/schedulerMiddleware/main.go b/schedulerMiddleware/main.go index fda4d31..88e5830 100644 --- a/schedulerMiddleware/main.go +++ b/schedulerMiddleware/main.go @@ -2,6 +2,8 @@ package main import ( "fmt" + pcmHubClient "gitlink.org.cn/JointCloud/pcm-hub/client" + pcmHubConfig "gitlink.org.cn/JointCloud/pcm-hub/config" "gitlink.org.cn/cloudream/common/pkgs/logger" schglb "gitlink.org.cn/cloudream/scheduler/common/globals" "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" @@ -42,7 +44,15 @@ func main() { if err != nil { logger.Fatalf("new job manager failed, err: %s", err.Error()) } - svc, err := services.NewService(preSchr, jobMgr, dbSvc) + + hubConfig := &pcmHubConfig.Config{ + Platforms: config.Cfg().PCMHub, + } + hubClient, err := pcmHubClient.NewClient(hubConfig) + if err != nil { + logger.Fatalf("new pcm hub client failed, err: %s", err.Error()) + } + svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient) if err != nil { logger.Fatalf("new service failed, err: %s", err.Error()) }