diff --git a/advisor/internal/task/task.go b/advisor/internal/task/task.go index db1a6ce..15dad2c 100644 --- a/advisor/internal/task/task.go +++ b/advisor/internal/task/task.go @@ -6,7 +6,7 @@ import ( "gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/utils/reflect2" - reporter "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter" + "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter" "gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler" advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task" ) @@ -50,7 +50,7 @@ func (m *Manager) StartByInfo(info advtsk.TaskInfo) (*Task, error) { return m.StartNew(ctor(info)), nil } -var taskFromInfoCtors map[reflect.Type]func(advtsk.TaskInfo) TaskBody = make(map[reflect.Type]func(advtsk.TaskInfo) task.TaskBody[TaskContext]) +var taskFromInfoCtors = make(map[reflect.Type]func(advtsk.TaskInfo) task.TaskBody[TaskContext]) func Register[TInfo advtsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { taskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info advtsk.TaskInfo) TaskBody { diff --git a/client/internal/cmdline/cmdline.go b/client/internal/cmdline/cmdline.go index 2bc278d..9e2035b 100644 --- a/client/internal/cmdline/cmdline.go +++ b/client/internal/cmdline/cmdline.go @@ -12,7 +12,7 @@ type CommandContext struct { Cmdline *Commandline } -var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() +var commands = cmdtrie.NewCommandTrie[CommandContext, error]() type Commandline struct { Svc *services.Service diff --git a/common/assets/confs/middleware.json b/common/assets/confs/middleware.config.json similarity index 95% rename from common/assets/confs/middleware.json rename to common/assets/confs/middleware.config.json index e4a3a21..3da1579 100644 --- a/common/assets/confs/middleware.json +++ b/common/assets/confs/middleware.config.json @@ -5,13 +5,13 @@ "outputDirectory": "log", "level": "debug" }, - "db2": { - "address": "101.201.215.196:3306", + "db": { + "address": "localhost:7070", "account": "pcm", "password": "123456@Asd", "databaseName": "scheduler" }, - "db": { + "db1": { "address": "localhost:3306", "account": "root", "password": "123456", diff --git a/common/pkgs/mq/advisor/server.go b/common/pkgs/mq/advisor/server.go index 93c1cc9..219b7a6 100644 --- a/common/pkgs/mq/advisor/server.go +++ b/common/pkgs/mq/advisor/server.go @@ -52,7 +52,7 @@ func (s *Server) OnError(callback func(error)) { s.rabbitSvr.OnError = callback } -var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() +var msgDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { diff --git a/common/pkgs/mq/collector/server.go b/common/pkgs/mq/collector/server.go index f6fd157..14880ad 100644 --- a/common/pkgs/mq/collector/server.go +++ b/common/pkgs/mq/collector/server.go @@ -58,7 +58,7 @@ func (s *Server) OnError(callback func(error)) { s.rabbitSvr.OnError = callback } -var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() +var msgDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { diff --git a/common/pkgs/mq/executor/server.go b/common/pkgs/mq/executor/server.go index 8cf7bf2..dfd67cb 100644 --- a/common/pkgs/mq/executor/server.go +++ b/common/pkgs/mq/executor/server.go @@ -55,7 +55,7 @@ func (s *Server) OnError(callback func(error)) { s.rabbitSvr.OnError = callback } -var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() +var msgDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { diff --git a/common/pkgs/mq/manager/server.go b/common/pkgs/mq/manager/server.go index ebe683d..10f8704 100644 --- a/common/pkgs/mq/manager/server.go +++ b/common/pkgs/mq/manager/server.go @@ -62,7 +62,7 @@ func (s *Server) OnError(callback func(error)) { s.rabbitSvr.OnError = callback } -var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() +var msgDispatcher = mq.NewMessageDispatcher() // Register 将Service中的一个接口函数作为指定类型消息的处理函数,同时会注册请求和响应的消息类型 func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { diff --git a/common/pkgs/prescheduler2/default_prescheduler.go b/common/pkgs/prescheduler2/default_prescheduler.go index c234c57..76bab6e 100644 --- a/common/pkgs/prescheduler2/default_prescheduler.go +++ b/common/pkgs/prescheduler2/default_prescheduler.go @@ -232,7 +232,7 @@ func (s *DefaultPreScheduler) ScheduleJob(priorities []pcmsch.ResourcePriority, // 查询指定算力中心 clusterIDs := make([]schsdk.ClusterID, 0, len(clusterMapping)) - for id, _ := range clusterMapping { + for id := range clusterMapping { clusterIDs = append(clusterIDs, id) } diff --git a/executor/internal/task/create_ecs/huaweicloud.go b/executor/internal/task/create_ecs/huaweicloud.go index 6b1a237..00f0145 100644 --- a/executor/internal/task/create_ecs/huaweicloud.go +++ b/executor/internal/task/create_ecs/huaweicloud.go @@ -6,7 +6,7 @@ import ( "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" ecs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model" - region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/region" + "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/region" ) // HuaweiCloud实现了CloudProvider接口 diff --git a/executor/internal/task/task.go b/executor/internal/task/task.go index 3ca984f..963d75b 100644 --- a/executor/internal/task/task.go +++ b/executor/internal/task/task.go @@ -71,7 +71,7 @@ func (t *Task) ID() string { return t.id } -var TaskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.TaskInfo) TaskBody) +var TaskFromInfoCtors = make(map[reflect.Type]func(exectsk.TaskInfo) TaskBody) func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { TaskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody { diff --git a/log/schedulerclient.log b/log/schedulerclient.log index 9bad8d1..658c31f 100644 --- a/log/schedulerclient.log +++ b/log/schedulerclient.log @@ -2196,3 +2196,163 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the 2025-02-12 16:56:48 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed 2025-02-12 16:56:48 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: 2025-02-12 16:56:48 [INFO] job set 0 completed +2025-02-12 16:56:52 [INFO] start serving http at: :7891 +2025-02-12 16:57:00 [DEBU] uploading job +2025-02-12 16:59:19 [ERRO] upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID +2025-02-12 16:59:19 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008c230 code {1 0}} &{0xc00017a000} +2025-02-12 16:59:19 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-12 16:59:19 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID +2025-02-12 16:59:19 [INFO] job set 0 completed +2025-02-13 10:13:19 [INFO] start serving http at: local +2025-02-13 10:13:19 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:19:00 [INFO] start serving http at: local +2025-02-13 10:19:00 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:20:55 [INFO] start serving http at: local +2025-02-13 10:20:55 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:21:17 [INFO] start serving http at: local +2025-02-13 10:21:17 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:22:21 [INFO] start serving http at: local +2025-02-13 10:22:21 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:22:52 [INFO] start serving http at: local +2025-02-13 10:22:52 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:24:00 [INFO] start serving http at: local +2025-02-13 10:24:00 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:24:49 [INFO] start serving http at: local +2025-02-13 10:24:49 [INFO] http stopped with error: listen tcp: address local: missing port in address +2025-02-13 10:34:37 [INFO] start serving http at: :7891 +2025-02-13 10:35:23 [INFO] start serving http at: :7891 +2025-02-13 11:16:40 [INFO] start serving http at: -c +2025-02-13 11:16:40 [INFO] http stopped with error: listen tcp: address -c: missing port in address +2025-02-13 11:25:16 [INFO] start serving http at: :7891 +2025-02-13 11:30:25 [FATA] failed to connect to database: dial tcp 101.201.215.196:3306: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. +2025-02-13 11:32:48 [INFO] start serving http at: :7891 +2025-02-13 14:01:34 [WARN] get access requests: invalid connection +2025-02-13 14:01:49 [INFO] start serving http at: :7891 +2025-02-13 14:01:58 [WARN] get access requests: failed to create user: code: OperationFailed, message: code: DataExists, message: user name already exists +2025-02-13 14:13:05 [WARN] [HTTP:JobSet.CreateFolder] creating folder: failed to create package: code: DataExists, message: package already exists +2025-02-13 14:27:43 [INFO] start serving http at: :7891 +2025-02-13 14:28:04 [DEBU] uploading job +2025-02-13 14:28:13 [ERRO] upload data: code: 500, message: +2025-02-13 14:28:13 [INFO] jobID: %s change state from %s to %s0&{1 0xc00011b450 code {1 0}} &{0xc0001de400} +2025-02-13 14:28:13 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 14:28:13 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 14:28:13 [INFO] job set 0 completed +2025-02-13 14:28:36 [DEBU] uploading job +2025-02-13 14:28:36 [ERRO] upload data: code: 500, message: +2025-02-13 14:28:36 [INFO] jobID: %s change state from %s to %s1&{1 0xc00066c960 code {1 0}} &{0xc0006844c0} +2025-02-13 14:28:36 [INFO] [JobID:1] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 14:28:36 [INFO] [JobID:1] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 14:28:36 [INFO] job set 1 completed +2025-02-13 14:29:37 [DEBU] uploading job +2025-02-13 14:29:49 [ERRO] upload data: code: 500, message: +2025-02-13 14:29:49 [INFO] jobID: %s change state from %s to %s2&{1 0xc00066caa0 code {1 0}} &{0xc000330020} +2025-02-13 14:29:49 [INFO] [JobID:2] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 14:29:49 [INFO] [JobID:2] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 14:29:49 [INFO] job set 2 completed +2025-02-13 14:31:31 [INFO] start serving http at: :7891 +2025-02-13 14:31:35 [DEBU] uploading job +2025-02-13 14:31:41 [ERRO] upload data: code: 500, message: +2025-02-13 14:31:41 [INFO] jobID: %s change state from %s to %s0&{1 0xc000274b40 code {1 0}} &{0xc000624060} +2025-02-13 14:31:41 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 14:31:41 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 14:31:41 [INFO] job set 0 completed +2025-02-13 14:33:50 [INFO] start serving http at: :7891 +2025-02-13 14:33:59 [DEBU] uploading job +2025-02-13 14:34:04 [ERRO] upload data: code: 500, message: +2025-02-13 14:34:04 [INFO] jobID: %s change state from %s to %s0&{1 0xc0001d80a0 code {1 0}} &{0xc0001de0a0} +2025-02-13 14:34:04 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 14:34:04 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 14:34:04 [INFO] job set 0 completed +2025-02-13 15:27:12 [FATA] failed to connect to database: dial tcp [::1]:7070: connectex: No connection could be made because the target machine actively refused it. +2025-02-13 15:27:51 [INFO] start serving http at: :7891 +2025-02-13 15:35:40 [INFO] start serving http at: :7891 +2025-02-13 15:35:50 [DEBU] uploading job +2025-02-13 15:36:05 [ERRO] upload data: code: 500, message: +2025-02-13 15:36:19 [INFO] jobID: %s change state from %s to %s0&{1 0xc00024b1d0 code 0xc000008a80 {1 0}} &{0xc0004c81a0} +2025-02-13 15:36:19 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:36:19 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:36:29 [INFO] job set 0 completed +2025-02-13 15:37:07 [DEBU] uploading job +2025-02-13 15:37:07 [ERRO] upload data: code: 500, message: +2025-02-13 15:37:07 [INFO] jobID: %s change state from %s to %s1&{1 0xc000166190 code 0xc00011a138 {1 0}} &{0xc000120760} +2025-02-13 15:37:07 [INFO] [JobID:1] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:37:07 [INFO] [JobID:1] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:37:07 [INFO] job set 1 completed +2025-02-13 15:37:39 [INFO] start serving http at: :7891 +2025-02-13 15:37:47 [DEBU] uploading job +2025-02-13 15:37:48 [ERRO] upload data: code: 500, message: +2025-02-13 15:37:48 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008d9f0 code 0xc000008be8 {1 0}} &{0xc0001de1a0} +2025-02-13 15:37:48 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:37:48 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:37:48 [INFO] job set 0 completed +2025-02-13 15:38:54 [INFO] start serving http at: :7891 +2025-02-13 15:39:00 [DEBU] uploading job +2025-02-13 15:39:01 [ERRO] upload data: code: 500, message: +2025-02-13 15:39:01 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008c3c0 code 0xc00061c390 {1 0}} &{0xc00060b580} +2025-02-13 15:39:01 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:39:01 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:39:01 [INFO] job set 0 completed +2025-02-13 15:40:51 [INFO] start serving http at: :7891 +2025-02-13 15:41:02 [DEBU] uploading job +2025-02-13 15:41:03 [ERRO] upload data: code: 500, message: +2025-02-13 15:41:03 [INFO] jobID: %s change state from %s to %s0&{1 0xc0006a1bd0 code 0xc0000086f0 {1 0}} &{0xc000248200} +2025-02-13 15:41:03 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:41:03 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:41:03 [INFO] job set 0 completed +2025-02-13 15:42:00 [INFO] start serving http at: :7891 +2025-02-13 15:42:04 [DEBU] uploading job +2025-02-13 15:42:05 [ERRO] upload data: code: 500, message: +2025-02-13 15:42:05 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008c5f0 code 0xc00012c6f0 {1 0}} &{0xc00070ac60} +2025-02-13 15:42:05 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:42:05 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:42:05 [INFO] job set 0 completed +2025-02-13 15:44:01 [INFO] start serving http at: :7891 +2025-02-13 15:44:07 [DEBU] uploading job +2025-02-13 15:44:08 [ERRO] upload data: code: 500, message: +2025-02-13 15:44:08 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008d8b0 code 0xc000008bd0 {1 0}} &{0xc0001ae360} +2025-02-13 15:44:08 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:44:08 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:44:08 [INFO] job set 0 completed +2025-02-13 15:45:10 [INFO] start serving http at: :7891 +2025-02-13 15:45:44 [INFO] start serving http at: :7891 +2025-02-13 15:45:52 [DEBU] uploading job +2025-02-13 15:45:55 [ERRO] upload data: code: 500, message: +2025-02-13 15:45:55 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008d130 code 0xc000120a80 {1 0}} &{0xc00022e2a0} +2025-02-13 15:45:55 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:45:55 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:45:55 [INFO] job set 0 completed +2025-02-13 15:46:57 [INFO] start serving http at: :7891 +2025-02-13 15:47:02 [DEBU] uploading job +2025-02-13 15:47:06 [ERRO] upload data: code: 500, message: +2025-02-13 15:47:06 [INFO] jobID: %s change state from %s to %s0&{1 0xc0004bf8b0 code 0xc000008c78 {1 0}} &{0xc0001de3e0} +2025-02-13 15:47:06 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:47:06 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:47:06 [INFO] job set 0 completed +2025-02-13 15:49:04 [INFO] start serving http at: :7891 +2025-02-13 15:49:07 [DEBU] uploading job +2025-02-13 15:49:08 [ERRO] upload data: code: 500, message: +2025-02-13 15:49:08 [INFO] jobID: %s change state from %s to %s0&{1 0xc0002741e0 code 0xc00064c3a8 {1 0}} &{0xc0002ca1e0} +2025-02-13 15:49:08 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:49:08 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:49:08 [INFO] job set 0 completed +2025-02-13 15:54:10 [INFO] start serving http at: :7891 +2025-02-13 15:54:20 [DEBU] uploading job +2025-02-13 15:54:21 [ERRO] upload data: code: 500, message: +2025-02-13 15:54:21 [INFO] jobID: %s change state from %s to %s0&{1 0xc00024a6e0 code 0xc000114a80 {1 0}} &{0xc00011a7c0} +2025-02-13 15:54:21 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:54:21 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:54:21 [INFO] job set 0 completed +2025-02-13 15:54:56 [INFO] start serving http at: :7891 +2025-02-13 15:55:00 [DEBU] uploading job +2025-02-13 15:55:01 [ERRO] upload data: code: 500, message: +2025-02-13 15:55:01 [INFO] jobID: %s change state from %s to %s0&{1 0xc0004cce60 code 0xc0004f0468 {1 0}} &{0xc0001de1a0} +2025-02-13 15:55:01 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:55:01 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:55:01 [INFO] job set 0 completed +2025-02-13 15:55:31 [INFO] start serving http at: :7891 +2025-02-13 15:55:34 [DEBU] uploading job +2025-02-13 15:55:35 [ERRO] upload data: code: 500, message: +2025-02-13 15:55:35 [INFO] jobID: %s change state from %s to %s0&{1 0xc00064a6e0 code 0xc000008738 {1 0}} &{0xc00051b360} +2025-02-13 15:55:35 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed +2025-02-13 15:55:35 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: +2025-02-13 15:55:35 [INFO] job set 0 completed diff --git a/schedulerMiddleware/internal/cmdline/cmdline.go b/schedulerMiddleware/internal/cmdline/cmdline.go index 40fe476..3f9234c 100644 --- a/schedulerMiddleware/internal/cmdline/cmdline.go +++ b/schedulerMiddleware/internal/cmdline/cmdline.go @@ -2,6 +2,7 @@ package cmdline import ( "fmt" + "github.com/spf13/cobra" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services" "os" @@ -12,7 +13,7 @@ type CommandContext struct { Cmdline *Commandline } -var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() +var commands = cmdtrie.NewCommandTrie[CommandContext, error]() type Commandline struct { Svc *services.Service @@ -43,3 +44,21 @@ func MustAddCmd(fn any, prefixWords ...string) any { commands.MustAdd(fn, prefixWords...) return nil } + +var RootCmd = &cobra.Command{ + Use: "middleware", + Short: "scheduler middleware", + Long: `scheduler middleware`, +} + +func init() { + var configPath string + var address string + + RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file") + RootCmd.Flags().StringVarP(&address, "listen", "l", ":7891", "listen address") + + RootCmd.Run = func(cmd *cobra.Command, args []string) { + serve(configPath, address) + } +} diff --git a/schedulerMiddleware/internal/cmdline/serve.go b/schedulerMiddleware/internal/cmdline/serve.go index 4418677..291746a 100644 --- a/schedulerMiddleware/internal/cmdline/serve.go +++ b/schedulerMiddleware/internal/cmdline/serve.go @@ -2,7 +2,17 @@ package cmdline import ( "fmt" + pcmHubClient "gitlink.org.cn/JointCloud/pcm-hub/client" + pcmHubConfig "gitlink.org.cn/JointCloud/pcm-hub/config" + "gitlink.org.cn/cloudream/common/pkgs/logger" + schglb "gitlink.org.cn/cloudream/scheduler/common/globals" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" + "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler2" + "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/config" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/http" + "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr" + "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services" + "os" ) var _ = MustAddCmd(func(ctx CommandContext, args []string) error { @@ -23,3 +33,58 @@ var _ = MustAddCmd(func(ctx CommandContext, args []string) error { return nil }, "serve", "http") + +func serve(configPath string, address string) { + err := config.Init(configPath) + if err != nil { + fmt.Printf("init config failed, err: %s", err.Error()) + os.Exit(1) + } + + err = logger.Init(&config.Cfg().Logger) + if err != nil { + fmt.Printf("init logger failed, err: %s", err.Error()) + os.Exit(1) + } + + schglb.InitPCMSchePool(&config.Cfg().PCMScheduler) + schglb.InitUploaderPool(&config.Cfg().Uploader) + schglb.InitBlockChainPool(&config.Cfg().BlockChain) + schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage) + + dbSvc, err := db.NewDB(&config.Cfg().DB) + if err != nil { + logger.Fatalf("new db2 failed, err: %s", err.Error()) + } + + preSchr := prescheduler2.NewDefaultPreScheduler() + nodeSvc := jobmgr.NewNodeService() + jobMgr, err := jobmgr.NewManager(dbSvc, nodeSvc) + if err != nil { + logger.Fatalf("new job manager failed, err: %s", err.Error()) + } + + hubConfig := &pcmHubConfig.Config{ + Platforms: config.Cfg().PCMHub, + } + hubClient, err := pcmHubClient.NewClient(hubConfig) + if err != nil { + logger.Fatalf("new pcm hub client failed, err: %s", err.Error()) + } + svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient) + if err != nil { + logger.Fatalf("new service failed, err: %s", err.Error()) + } + + httpSvr, err := http.NewServer(address, svc) + if err != nil { + logger.Errorf(err.Error()) + os.Exit(1) + } + + err = httpSvr.Serve() + if err != nil { + logger.Errorf(err.Error()) + os.Exit(1) + } +} diff --git a/schedulerMiddleware/internal/config/config.go b/schedulerMiddleware/internal/config/config.go index 60c4e79..971a15d 100644 --- a/schedulerMiddleware/internal/config/config.go +++ b/schedulerMiddleware/internal/config/config.go @@ -23,8 +23,12 @@ type Config struct { var cfg Config -func Init() error { - return config.DefaultLoad("middleware", &cfg) +func Init(path string) error { + if path == "" { + return config.DefaultLoad("middleware", &cfg) + } + + return config.Load(path, &cfg) } func Cfg() *Config { diff --git a/schedulerMiddleware/internal/http/jobset.go b/schedulerMiddleware/internal/http/jobset.go index efc07a0..da99608 100644 --- a/schedulerMiddleware/internal/http/jobset.go +++ b/schedulerMiddleware/internal/http/jobset.go @@ -9,6 +9,7 @@ import ( cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader" "gitlink.org.cn/cloudream/common/utils/serder" + jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task" "io" "net/http" ) @@ -124,16 +125,43 @@ func (s *JobSetService) Upload(ctx *gin.Context) { } blockChainToken := ctx.Request.Header.Get("Authorization") + task := jobTask.NewJobTask[sch.TaskMessage]() - _, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken) + _, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken, task) if err != nil { log.Warnf("uploading file: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "upload file failed, error: "+err.Error())) return } - ctx.JSON(http.StatusOK, OK("success")) + //ctx.JSON(http.StatusOK, OK("success")) + flusher, ok := ctx.Writer.(http.Flusher) + if !ok { + http.Error(ctx.Writer, "Streaming not supported", http.StatusInternalServerError) + return + } + + ctx.Writer.Header().Set("Cache-Control", "no-cache") + ctx.Writer.Header().Set("Content-Type", "text/event-stream") + + for { + fut := task.Receive() + receive := <-fut.Chan() + + if receive.Value.Status == sch.FailedStatus || receive.Value.Status == sch.SuccessStatus { + //i := 1 + data := "event: message\ndata: " + receive.Value.Message + "\n\n" + _, err = ctx.Writer.Write([]byte(data)) + println(receive.Value.Message) + if err != nil { + break + } + flusher.Flush() + ctx.Writer.CloseNotify() + return + } + } } type CreateFolderReq struct { @@ -343,6 +371,7 @@ func (s *JobSetService) RemoveBinding(ctx *gin.Context) { type QueryBindingReq struct { DataType string `json:"dataType" binding:"required"` Param sch.QueryBindingDataParam `json:"param" binding:"required"` + Filters sch.QueryBindingFilters `json:"filters"` } type QueryBindingResp struct { @@ -365,7 +394,7 @@ func (s *JobSetService) QueryBinding(ctx *gin.Context) { return } - details, err := s.svc.JobSetSvc().QueryBinding(req.DataType, req.Param) + details, err := s.svc.JobSetSvc().QueryBinding(req.DataType, req.Param, req.Filters) if err != nil { log.Warnf("getting service list: %s", err.Error()) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "binding data failed, error: "+err.Error())) diff --git a/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go b/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go index 29bcb6b..cefbbcd 100644 --- a/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go +++ b/schedulerMiddleware/internal/manager/jobmgr/job/state2/data_upload.go @@ -14,6 +14,7 @@ import ( jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state" + jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task" "strconv" "strings" "sync" @@ -26,16 +27,17 @@ type DataUpload struct { dataType string blockChainToken string //storages []cdssdk.StorageID + task *jobTask.JobTask[sch.TaskMessage] lock sync.Mutex } -func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string) *DataUpload { +func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) *DataUpload { return &DataUpload{ userID: userID, uploadInfo: uploadInfo, dataType: dataType, blockChainToken: blockChainToken, - //storages: storages, + task: task, } } @@ -86,12 +88,22 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error { } uploadResp, err := uploaderCli.Upload(req) if err != nil { + message := sch.TaskMessage{ + Status: sch.FailedStatus, + Message: fmt.Sprintf("upload data: %w", err), + } + s.task.Send(message) return fmt.Errorf("upload data: %w", err) } if uploadResp.JsonData != "" { err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, uploadResp.JsonData, -1) if err != nil { + message := sch.TaskMessage{ + Status: sch.FailedStatus, + Message: fmt.Sprintf("update package: %w", err), + } + s.task.Send(message) return fmt.Errorf("update package: %w", err) } } @@ -100,6 +112,12 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error { } + message := sch.TaskMessage{ + Status: sch.SuccessStatus, + Message: "upload success!", + } + s.task.Send(message) + // 传入存证 blockChains, err := s.blockChain(objectIDs) if err != nil { diff --git a/schedulerMiddleware/internal/manager/task/task.go b/schedulerMiddleware/internal/manager/task/task.go index bfc68e6..05de3a5 100644 --- a/schedulerMiddleware/internal/manager/task/task.go +++ b/schedulerMiddleware/internal/manager/task/task.go @@ -3,6 +3,9 @@ package jobTask import ( "fmt" "gitlink.org.cn/cloudream/common/pkgs/async" + + //"gitlink.org.cn/cloudream/common/pkgs/async" + //"gitlink.org.cn/cloudream/common/utils/sync2" "gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/logger" "math/rand" @@ -37,9 +40,13 @@ func (c *JobTask[T]) Receive() future.Future1[T] { return c.taskChan.Receive() } -func (c *JobTask[T]) Send(info any) { - - logger.Info("send http") +func (c *JobTask[T]) Send(info T) { + err := c.taskChan.Send(info) + if err != nil { + logger.Error(err.Error()) + return + } + //logger.Info("send http") } func (c *JobTask[T]) Chan() *async.UnboundChannel[T] { diff --git a/schedulerMiddleware/internal/services/jobset.go b/schedulerMiddleware/internal/services/jobset.go index a97ffba..a8ffc20 100644 --- a/schedulerMiddleware/internal/services/jobset.go +++ b/schedulerMiddleware/internal/services/jobset.go @@ -7,6 +7,7 @@ import ( "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/algorithm" "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/dataset" "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/model" + jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task" "sort" "gitlink.org.cn/cloudream/common/pkgs/logger" @@ -50,7 +51,7 @@ func (svc *JobSetService) PreScheduler(jobSet schsdk.JobSetInfo) (*jobmod.JobSet return schScheme, uploadScheme, nil } -func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, blockChainToken string) (*schsdk.JobSetID, error) { +func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) (*schsdk.JobSetID, error) { logger.Debugf("uploading job") // 查询数据库里维护的集群 @@ -101,7 +102,7 @@ func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, jo := job.NewNormalJob(schsdk.NormalJobInfo{}) jobs = append(jobs, jobmgr.SubmittingJob{ Body: jo, - InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken), + InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken, task), }) jobSetID := svc.jobMgr.SubmitJobSet(jobs) @@ -595,7 +596,7 @@ func (svc *JobSetService) DeleteBinding(IDs []int64) error { return nil } -func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDataParam) ([]uploadersdk.BindingDetail, error) { +func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDataParam, filters sch.QueryBindingFilters) ([]uploadersdk.BindingDetail, error) { switch p := param.(type) { case *sch.PrivateLevel: @@ -640,6 +641,15 @@ func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDa for _, data := range datas { var info sch.DataBinding + + // 如果有指定状态,则只展示指定状态的数据 + if filters.Status != "" && data.Status != filters.Status { + continue + } + if filters.Name != "" && data.Name != filters.Name { + continue + } + // 只有approved状态的数据才能看到详情 if data.Status == sch.ApprovedStatus { binding := uploadersdk.Binding{ diff --git a/schedulerMiddleware/internal/services/service.go b/schedulerMiddleware/internal/services/service.go index 7c2ed34..a3c3208 100644 --- a/schedulerMiddleware/internal/services/service.go +++ b/schedulerMiddleware/internal/services/service.go @@ -12,6 +12,7 @@ type Service struct { jobMgr *jobmgr.Manager db *db.DB hubClient *hub.Client + //JobTask *jobTask.JobTask[string] } func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB, hubClient *hub.Client) (*Service, error) { diff --git a/schedulerMiddleware/main.go b/schedulerMiddleware/main.go index 88e5830..cef444b 100644 --- a/schedulerMiddleware/main.go +++ b/schedulerMiddleware/main.go @@ -1,67 +1,9 @@ package main import ( - "fmt" - pcmHubClient "gitlink.org.cn/JointCloud/pcm-hub/client" - pcmHubConfig "gitlink.org.cn/JointCloud/pcm-hub/config" - "gitlink.org.cn/cloudream/common/pkgs/logger" - schglb "gitlink.org.cn/cloudream/scheduler/common/globals" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/db" - "gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler2" - "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/cmdline" - "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/config" - "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr" - "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services" - "os" + cmd "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/cmdline" ) func main() { - err := config.Init() - if err != nil { - fmt.Printf("init config failed, err: %s", err.Error()) - os.Exit(1) - } - - err = logger.Init(&config.Cfg().Logger) - if err != nil { - fmt.Printf("init logger failed, err: %s", err.Error()) - os.Exit(1) - } - - schglb.InitPCMSchePool(&config.Cfg().PCMScheduler) - schglb.InitUploaderPool(&config.Cfg().Uploader) - schglb.InitBlockChainPool(&config.Cfg().BlockChain) - schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage) - - dbSvc, err := db.NewDB(&config.Cfg().DB) - if err != nil { - logger.Fatalf("new db2 failed, err: %s", err.Error()) - } - - preSchr := prescheduler2.NewDefaultPreScheduler() - nodeSvc := jobmgr.NewNodeService() - jobMgr, err := jobmgr.NewManager(dbSvc, nodeSvc) - if err != nil { - logger.Fatalf("new job manager failed, err: %s", err.Error()) - } - - hubConfig := &pcmHubConfig.Config{ - Platforms: config.Cfg().PCMHub, - } - hubClient, err := pcmHubClient.NewClient(hubConfig) - if err != nil { - logger.Fatalf("new pcm hub client failed, err: %s", err.Error()) - } - svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient) - if err != nil { - logger.Fatalf("new service failed, err: %s", err.Error()) - } - - cmds, err := cmdline.NewCommandline(svc) - if err != nil { - logger.Warnf("new command line failed, err: %s", err.Error()) - os.Exit(1) - } - - cmds.DispatchCommand(os.Args[1:]) + cmd.RootCmd.Execute() }