调整启动方式

This commit is contained in:
JeshuaRen 2025-02-13 16:04:43 +08:00
parent fa797c1774
commit 8c1ff7b360
20 changed files with 342 additions and 87 deletions

View File

@ -6,7 +6,7 @@ import (
"gitlink.org.cn/cloudream/common/pkgs/task" "gitlink.org.cn/cloudream/common/pkgs/task"
"gitlink.org.cn/cloudream/common/utils/reflect2" "gitlink.org.cn/cloudream/common/utils/reflect2"
reporter "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter" "gitlink.org.cn/cloudream/scheduler/advisor/internal/reporter"
"gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler" "gitlink.org.cn/cloudream/scheduler/advisor/internal/scheduler"
advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task" advtsk "gitlink.org.cn/cloudream/scheduler/common/pkgs/mq/advisor/task"
) )
@ -50,7 +50,7 @@ func (m *Manager) StartByInfo(info advtsk.TaskInfo) (*Task, error) {
return m.StartNew(ctor(info)), nil return m.StartNew(ctor(info)), nil
} }
var taskFromInfoCtors map[reflect.Type]func(advtsk.TaskInfo) TaskBody = make(map[reflect.Type]func(advtsk.TaskInfo) task.TaskBody[TaskContext]) var taskFromInfoCtors = make(map[reflect.Type]func(advtsk.TaskInfo) task.TaskBody[TaskContext])
func Register[TInfo advtsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { func Register[TInfo advtsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
taskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info advtsk.TaskInfo) TaskBody { taskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info advtsk.TaskInfo) TaskBody {

View File

@ -12,7 +12,7 @@ type CommandContext struct {
Cmdline *Commandline Cmdline *Commandline
} }
var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() var commands = cmdtrie.NewCommandTrie[CommandContext, error]()
type Commandline struct { type Commandline struct {
Svc *services.Service Svc *services.Service

View File

@ -5,13 +5,13 @@
"outputDirectory": "log", "outputDirectory": "log",
"level": "debug" "level": "debug"
}, },
"db2": { "db": {
"address": "101.201.215.196:3306", "address": "localhost:7070",
"account": "pcm", "account": "pcm",
"password": "123456@Asd", "password": "123456@Asd",
"databaseName": "scheduler" "databaseName": "scheduler"
}, },
"db": { "db1": {
"address": "localhost:3306", "address": "localhost:3306",
"account": "root", "account": "root",
"password": "123456", "password": "123456",

View File

@ -52,7 +52,7 @@ func (s *Server) OnError(callback func(error)) {
s.rabbitSvr.OnError = callback s.rabbitSvr.OnError = callback
} }
var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() var msgDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型 // Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {

View File

@ -58,7 +58,7 @@ func (s *Server) OnError(callback func(error)) {
s.rabbitSvr.OnError = callback s.rabbitSvr.OnError = callback
} }
var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() var msgDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型 // Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {

View File

@ -55,7 +55,7 @@ func (s *Server) OnError(callback func(error)) {
s.rabbitSvr.OnError = callback s.rabbitSvr.OnError = callback
} }
var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() var msgDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型 // Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {

View File

@ -62,7 +62,7 @@ func (s *Server) OnError(callback func(error)) {
s.rabbitSvr.OnError = callback s.rabbitSvr.OnError = callback
} }
var msgDispatcher mq.MessageDispatcher = mq.NewMessageDispatcher() var msgDispatcher = mq.NewMessageDispatcher()
// Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型 // Register 将Service中的一个接口函数作为指定类型消息的处理函数同时会注册请求和响应的消息类型
func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any { func Register[TReq mq.MessageBody, TResp mq.MessageBody](svcFn func(svc Service, msg TReq) (TResp, *mq.CodeMessage)) any {

View File

@ -232,7 +232,7 @@ func (s *DefaultPreScheduler) ScheduleJob(priorities []pcmsch.ResourcePriority,
// 查询指定算力中心 // 查询指定算力中心
clusterIDs := make([]schsdk.ClusterID, 0, len(clusterMapping)) clusterIDs := make([]schsdk.ClusterID, 0, len(clusterMapping))
for id, _ := range clusterMapping { for id := range clusterMapping {
clusterIDs = append(clusterIDs, id) clusterIDs = append(clusterIDs, id)
} }

View File

@ -6,7 +6,7 @@ import (
"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
ecs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2" ecs "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2"
"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/model"
region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/region" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/ecs/v2/region"
) )
// HuaweiCloud实现了CloudProvider接口 // HuaweiCloud实现了CloudProvider接口

View File

@ -71,7 +71,7 @@ func (t *Task) ID() string {
return t.id return t.id
} }
var TaskFromInfoCtors map[reflect.Type]func(exectsk.TaskInfo) TaskBody = make(map[reflect.Type]func(exectsk.TaskInfo) TaskBody) var TaskFromInfoCtors = make(map[reflect.Type]func(exectsk.TaskInfo) TaskBody)
func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) { func Register[TInfo exectsk.TaskInfo, TTaskBody TaskBody](ctor func(info TInfo) TTaskBody) {
TaskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody { TaskFromInfoCtors[reflect2.TypeOf[TInfo]()] = func(info exectsk.TaskInfo) TaskBody {

View File

@ -2196,3 +2196,163 @@ Key: 'QueryUploadedReq.UserID' Error:Field validation for 'UserID' failed on the
2025-02-12 16:56:48 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed 2025-02-12 16:56:48 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-12 16:56:48 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message: 2025-02-12 16:56:48 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-12 16:56:48 [INFO] job set 0 completed 2025-02-12 16:56:48 [INFO] job set 0 completed
2025-02-12 16:56:52 [INFO] start serving http at: :7891
2025-02-12 16:57:00 [DEBU] uploading job
2025-02-12 16:59:19 [ERRO] upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID
2025-02-12 16:59:19 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008c230 code {1 0}} &{0xc00017a000}
2025-02-12 16:59:19 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-12 16:59:19 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: parsing response: json: cannot unmarshal number into Go struct field UploadResp.data.clusterID of type uploadersdk.ClusterID
2025-02-12 16:59:19 [INFO] job set 0 completed
2025-02-13 10:13:19 [INFO] start serving http at: local
2025-02-13 10:13:19 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:19:00 [INFO] start serving http at: local
2025-02-13 10:19:00 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:20:55 [INFO] start serving http at: local
2025-02-13 10:20:55 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:21:17 [INFO] start serving http at: local
2025-02-13 10:21:17 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:22:21 [INFO] start serving http at: local
2025-02-13 10:22:21 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:22:52 [INFO] start serving http at: local
2025-02-13 10:22:52 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:24:00 [INFO] start serving http at: local
2025-02-13 10:24:00 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:24:49 [INFO] start serving http at: local
2025-02-13 10:24:49 [INFO] http stopped with error: listen tcp: address local: missing port in address
2025-02-13 10:34:37 [INFO] start serving http at: :7891
2025-02-13 10:35:23 [INFO] start serving http at: :7891
2025-02-13 11:16:40 [INFO] start serving http at: -c
2025-02-13 11:16:40 [INFO] http stopped with error: listen tcp: address -c: missing port in address
2025-02-13 11:25:16 [INFO] start serving http at: :7891
2025-02-13 11:30:25 [FATA] failed to connect to database: dial tcp 101.201.215.196:3306: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.
2025-02-13 11:32:48 [INFO] start serving http at: :7891
2025-02-13 14:01:34 [WARN] get access requests: invalid connection
2025-02-13 14:01:49 [INFO] start serving http at: :7891
2025-02-13 14:01:58 [WARN] get access requests: failed to create user: code: OperationFailed, message: code: DataExists, message: user name already exists
2025-02-13 14:13:05 [WARN] [HTTP:JobSet.CreateFolder] creating folder: failed to create package: code: DataExists, message: package already exists
2025-02-13 14:27:43 [INFO] start serving http at: :7891
2025-02-13 14:28:04 [DEBU] uploading job
2025-02-13 14:28:13 [ERRO] upload data: code: 500, message:
2025-02-13 14:28:13 [INFO] jobID: %s change state from %s to %s0&{1 0xc00011b450 code {1 0}} &{0xc0001de400}
2025-02-13 14:28:13 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 14:28:13 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 14:28:13 [INFO] job set 0 completed
2025-02-13 14:28:36 [DEBU] uploading job
2025-02-13 14:28:36 [ERRO] upload data: code: 500, message:
2025-02-13 14:28:36 [INFO] jobID: %s change state from %s to %s1&{1 0xc00066c960 code {1 0}} &{0xc0006844c0}
2025-02-13 14:28:36 [INFO] [JobID:1] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 14:28:36 [INFO] [JobID:1] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 14:28:36 [INFO] job set 1 completed
2025-02-13 14:29:37 [DEBU] uploading job
2025-02-13 14:29:49 [ERRO] upload data: code: 500, message:
2025-02-13 14:29:49 [INFO] jobID: %s change state from %s to %s2&{1 0xc00066caa0 code {1 0}} &{0xc000330020}
2025-02-13 14:29:49 [INFO] [JobID:2] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 14:29:49 [INFO] [JobID:2] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 14:29:49 [INFO] job set 2 completed
2025-02-13 14:31:31 [INFO] start serving http at: :7891
2025-02-13 14:31:35 [DEBU] uploading job
2025-02-13 14:31:41 [ERRO] upload data: code: 500, message:
2025-02-13 14:31:41 [INFO] jobID: %s change state from %s to %s0&{1 0xc000274b40 code {1 0}} &{0xc000624060}
2025-02-13 14:31:41 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 14:31:41 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 14:31:41 [INFO] job set 0 completed
2025-02-13 14:33:50 [INFO] start serving http at: :7891
2025-02-13 14:33:59 [DEBU] uploading job
2025-02-13 14:34:04 [ERRO] upload data: code: 500, message:
2025-02-13 14:34:04 [INFO] jobID: %s change state from %s to %s0&{1 0xc0001d80a0 code {1 0}} &{0xc0001de0a0}
2025-02-13 14:34:04 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 14:34:04 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 14:34:04 [INFO] job set 0 completed
2025-02-13 15:27:12 [FATA] failed to connect to database: dial tcp [::1]:7070: connectex: No connection could be made because the target machine actively refused it.
2025-02-13 15:27:51 [INFO] start serving http at: :7891
2025-02-13 15:35:40 [INFO] start serving http at: :7891
2025-02-13 15:35:50 [DEBU] uploading job
2025-02-13 15:36:05 [ERRO] upload data: code: 500, message:
2025-02-13 15:36:19 [INFO] jobID: %s change state from %s to %s0&{1 0xc00024b1d0 code 0xc000008a80 {1 0}} &{0xc0004c81a0}
2025-02-13 15:36:19 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:36:19 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:36:29 [INFO] job set 0 completed
2025-02-13 15:37:07 [DEBU] uploading job
2025-02-13 15:37:07 [ERRO] upload data: code: 500, message:
2025-02-13 15:37:07 [INFO] jobID: %s change state from %s to %s1&{1 0xc000166190 code 0xc00011a138 {1 0}} &{0xc000120760}
2025-02-13 15:37:07 [INFO] [JobID:1] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:37:07 [INFO] [JobID:1] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:37:07 [INFO] job set 1 completed
2025-02-13 15:37:39 [INFO] start serving http at: :7891
2025-02-13 15:37:47 [DEBU] uploading job
2025-02-13 15:37:48 [ERRO] upload data: code: 500, message:
2025-02-13 15:37:48 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008d9f0 code 0xc000008be8 {1 0}} &{0xc0001de1a0}
2025-02-13 15:37:48 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:37:48 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:37:48 [INFO] job set 0 completed
2025-02-13 15:38:54 [INFO] start serving http at: :7891
2025-02-13 15:39:00 [DEBU] uploading job
2025-02-13 15:39:01 [ERRO] upload data: code: 500, message:
2025-02-13 15:39:01 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008c3c0 code 0xc00061c390 {1 0}} &{0xc00060b580}
2025-02-13 15:39:01 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:39:01 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:39:01 [INFO] job set 0 completed
2025-02-13 15:40:51 [INFO] start serving http at: :7891
2025-02-13 15:41:02 [DEBU] uploading job
2025-02-13 15:41:03 [ERRO] upload data: code: 500, message:
2025-02-13 15:41:03 [INFO] jobID: %s change state from %s to %s0&{1 0xc0006a1bd0 code 0xc0000086f0 {1 0}} &{0xc000248200}
2025-02-13 15:41:03 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:41:03 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:41:03 [INFO] job set 0 completed
2025-02-13 15:42:00 [INFO] start serving http at: :7891
2025-02-13 15:42:04 [DEBU] uploading job
2025-02-13 15:42:05 [ERRO] upload data: code: 500, message:
2025-02-13 15:42:05 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008c5f0 code 0xc00012c6f0 {1 0}} &{0xc00070ac60}
2025-02-13 15:42:05 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:42:05 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:42:05 [INFO] job set 0 completed
2025-02-13 15:44:01 [INFO] start serving http at: :7891
2025-02-13 15:44:07 [DEBU] uploading job
2025-02-13 15:44:08 [ERRO] upload data: code: 500, message:
2025-02-13 15:44:08 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008d8b0 code 0xc000008bd0 {1 0}} &{0xc0001ae360}
2025-02-13 15:44:08 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:44:08 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:44:08 [INFO] job set 0 completed
2025-02-13 15:45:10 [INFO] start serving http at: :7891
2025-02-13 15:45:44 [INFO] start serving http at: :7891
2025-02-13 15:45:52 [DEBU] uploading job
2025-02-13 15:45:55 [ERRO] upload data: code: 500, message:
2025-02-13 15:45:55 [INFO] jobID: %s change state from %s to %s0&{1 0xc00008d130 code 0xc000120a80 {1 0}} &{0xc00022e2a0}
2025-02-13 15:45:55 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:45:55 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:45:55 [INFO] job set 0 completed
2025-02-13 15:46:57 [INFO] start serving http at: :7891
2025-02-13 15:47:02 [DEBU] uploading job
2025-02-13 15:47:06 [ERRO] upload data: code: 500, message:
2025-02-13 15:47:06 [INFO] jobID: %s change state from %s to %s0&{1 0xc0004bf8b0 code 0xc000008c78 {1 0}} &{0xc0001de3e0}
2025-02-13 15:47:06 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:47:06 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:47:06 [INFO] job set 0 completed
2025-02-13 15:49:04 [INFO] start serving http at: :7891
2025-02-13 15:49:07 [DEBU] uploading job
2025-02-13 15:49:08 [ERRO] upload data: code: 500, message:
2025-02-13 15:49:08 [INFO] jobID: %s change state from %s to %s0&{1 0xc0002741e0 code 0xc00064c3a8 {1 0}} &{0xc0002ca1e0}
2025-02-13 15:49:08 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:49:08 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:49:08 [INFO] job set 0 completed
2025-02-13 15:54:10 [INFO] start serving http at: :7891
2025-02-13 15:54:20 [DEBU] uploading job
2025-02-13 15:54:21 [ERRO] upload data: code: 500, message:
2025-02-13 15:54:21 [INFO] jobID: %s change state from %s to %s0&{1 0xc00024a6e0 code 0xc000114a80 {1 0}} &{0xc00011a7c0}
2025-02-13 15:54:21 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:54:21 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:54:21 [INFO] job set 0 completed
2025-02-13 15:54:56 [INFO] start serving http at: :7891
2025-02-13 15:55:00 [DEBU] uploading job
2025-02-13 15:55:01 [ERRO] upload data: code: 500, message:
2025-02-13 15:55:01 [INFO] jobID: %s change state from %s to %s0&{1 0xc0004cce60 code 0xc0004f0468 {1 0}} &{0xc0001de1a0}
2025-02-13 15:55:01 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:55:01 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:55:01 [INFO] job set 0 completed
2025-02-13 15:55:31 [INFO] start serving http at: :7891
2025-02-13 15:55:34 [DEBU] uploading job
2025-02-13 15:55:35 [ERRO] upload data: code: 500, message:
2025-02-13 15:55:35 [INFO] jobID: %s change state from %s to %s0&{1 0xc00064a6e0 code 0xc000008738 {1 0}} &{0xc00051b360}
2025-02-13 15:55:35 [INFO] [JobID:0] state changed: *state2.DataUpload -> *state.Completed
2025-02-13 15:55:35 [INFO] [JobID:0] [LastState:*state2.DataUpload] job failed with: upload data: code: 500, message:
2025-02-13 15:55:35 [INFO] job set 0 completed

View File

@ -2,6 +2,7 @@ package cmdline
import ( import (
"fmt" "fmt"
"github.com/spf13/cobra"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services"
"os" "os"
@ -12,7 +13,7 @@ type CommandContext struct {
Cmdline *Commandline Cmdline *Commandline
} }
var commands cmdtrie.CommandTrie[CommandContext, error] = cmdtrie.NewCommandTrie[CommandContext, error]() var commands = cmdtrie.NewCommandTrie[CommandContext, error]()
type Commandline struct { type Commandline struct {
Svc *services.Service Svc *services.Service
@ -43,3 +44,21 @@ func MustAddCmd(fn any, prefixWords ...string) any {
commands.MustAdd(fn, prefixWords...) commands.MustAdd(fn, prefixWords...)
return nil return nil
} }
var RootCmd = &cobra.Command{
Use: "middleware",
Short: "scheduler middleware",
Long: `scheduler middleware`,
}
func init() {
var configPath string
var address string
RootCmd.Flags().StringVarP(&configPath, "config", "c", "", "Path to config file")
RootCmd.Flags().StringVarP(&address, "listen", "l", ":7891", "listen address")
RootCmd.Run = func(cmd *cobra.Command, args []string) {
serve(configPath, address)
}
}

View File

@ -2,7 +2,17 @@ package cmdline
import ( import (
"fmt" "fmt"
pcmHubClient "gitlink.org.cn/JointCloud/pcm-hub/client"
pcmHubConfig "gitlink.org.cn/JointCloud/pcm-hub/config"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/db"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler2"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/config"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/http" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/http"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services"
"os"
) )
var _ = MustAddCmd(func(ctx CommandContext, args []string) error { var _ = MustAddCmd(func(ctx CommandContext, args []string) error {
@ -23,3 +33,58 @@ var _ = MustAddCmd(func(ctx CommandContext, args []string) error {
return nil return nil
}, "serve", "http") }, "serve", "http")
func serve(configPath string, address string) {
err := config.Init(configPath)
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}
schglb.InitPCMSchePool(&config.Cfg().PCMScheduler)
schglb.InitUploaderPool(&config.Cfg().Uploader)
schglb.InitBlockChainPool(&config.Cfg().BlockChain)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
dbSvc, err := db.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db2 failed, err: %s", err.Error())
}
preSchr := prescheduler2.NewDefaultPreScheduler()
nodeSvc := jobmgr.NewNodeService()
jobMgr, err := jobmgr.NewManager(dbSvc, nodeSvc)
if err != nil {
logger.Fatalf("new job manager failed, err: %s", err.Error())
}
hubConfig := &pcmHubConfig.Config{
Platforms: config.Cfg().PCMHub,
}
hubClient, err := pcmHubClient.NewClient(hubConfig)
if err != nil {
logger.Fatalf("new pcm hub client failed, err: %s", err.Error())
}
svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient)
if err != nil {
logger.Fatalf("new service failed, err: %s", err.Error())
}
httpSvr, err := http.NewServer(address, svc)
if err != nil {
logger.Errorf(err.Error())
os.Exit(1)
}
err = httpSvr.Serve()
if err != nil {
logger.Errorf(err.Error())
os.Exit(1)
}
}

View File

@ -23,8 +23,12 @@ type Config struct {
var cfg Config var cfg Config
func Init() error { func Init(path string) error {
return config.DefaultLoad("middleware", &cfg) if path == "" {
return config.DefaultLoad("middleware", &cfg)
}
return config.Load(path, &cfg)
} }
func Cfg() *Config { func Cfg() *Config {

View File

@ -9,6 +9,7 @@ import (
cdssdk "gitlink.org.cn/cloudream/common/sdks/storage" cdssdk "gitlink.org.cn/cloudream/common/sdks/storage"
uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader" uploadersdk "gitlink.org.cn/cloudream/common/sdks/uploader"
"gitlink.org.cn/cloudream/common/utils/serder" "gitlink.org.cn/cloudream/common/utils/serder"
jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task"
"io" "io"
"net/http" "net/http"
) )
@ -124,16 +125,43 @@ func (s *JobSetService) Upload(ctx *gin.Context) {
} }
blockChainToken := ctx.Request.Header.Get("Authorization") blockChainToken := ctx.Request.Header.Get("Authorization")
task := jobTask.NewJobTask[sch.TaskMessage]()
_, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken) _, err = s.svc.JobSetSvc().Upload(req.UserID, req.UploadParams, blockChainToken, task)
if err != nil { if err != nil {
log.Warnf("uploading file: %s", err.Error()) log.Warnf("uploading file: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "upload file failed, error: "+err.Error())) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "upload file failed, error: "+err.Error()))
return return
} }
ctx.JSON(http.StatusOK, OK("success")) //ctx.JSON(http.StatusOK, OK("success"))
flusher, ok := ctx.Writer.(http.Flusher)
if !ok {
http.Error(ctx.Writer, "Streaming not supported", http.StatusInternalServerError)
return
}
ctx.Writer.Header().Set("Cache-Control", "no-cache")
ctx.Writer.Header().Set("Content-Type", "text/event-stream")
for {
fut := task.Receive()
receive := <-fut.Chan()
if receive.Value.Status == sch.FailedStatus || receive.Value.Status == sch.SuccessStatus {
//i := 1
data := "event: message\ndata: " + receive.Value.Message + "\n\n"
_, err = ctx.Writer.Write([]byte(data))
println(receive.Value.Message)
if err != nil {
break
}
flusher.Flush()
ctx.Writer.CloseNotify()
return
}
}
} }
type CreateFolderReq struct { type CreateFolderReq struct {
@ -343,6 +371,7 @@ func (s *JobSetService) RemoveBinding(ctx *gin.Context) {
type QueryBindingReq struct { type QueryBindingReq struct {
DataType string `json:"dataType" binding:"required"` DataType string `json:"dataType" binding:"required"`
Param sch.QueryBindingDataParam `json:"param" binding:"required"` Param sch.QueryBindingDataParam `json:"param" binding:"required"`
Filters sch.QueryBindingFilters `json:"filters"`
} }
type QueryBindingResp struct { type QueryBindingResp struct {
@ -365,7 +394,7 @@ func (s *JobSetService) QueryBinding(ctx *gin.Context) {
return return
} }
details, err := s.svc.JobSetSvc().QueryBinding(req.DataType, req.Param) details, err := s.svc.JobSetSvc().QueryBinding(req.DataType, req.Param, req.Filters)
if err != nil { if err != nil {
log.Warnf("getting service list: %s", err.Error()) log.Warnf("getting service list: %s", err.Error())
ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "binding data failed, error: "+err.Error())) ctx.JSON(http.StatusOK, Failed(errorcode.OperationFailed, "binding data failed, error: "+err.Error()))

View File

@ -14,6 +14,7 @@ import (
jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job" jobmod "gitlink.org.cn/cloudream/scheduler/common/models/job"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state" "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr/job/state"
jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -26,16 +27,17 @@ type DataUpload struct {
dataType string dataType string
blockChainToken string blockChainToken string
//storages []cdssdk.StorageID //storages []cdssdk.StorageID
task *jobTask.JobTask[sch.TaskMessage]
lock sync.Mutex lock sync.Mutex
} }
func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string) *DataUpload { func NewDataUpload(userID cdssdk.UserID, uploadInfo sch.UploadInfo, dataType string, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) *DataUpload {
return &DataUpload{ return &DataUpload{
userID: userID, userID: userID,
uploadInfo: uploadInfo, uploadInfo: uploadInfo,
dataType: dataType, dataType: dataType,
blockChainToken: blockChainToken, blockChainToken: blockChainToken,
//storages: storages, task: task,
} }
} }
@ -86,12 +88,22 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
} }
uploadResp, err := uploaderCli.Upload(req) uploadResp, err := uploaderCli.Upload(req)
if err != nil { if err != nil {
message := sch.TaskMessage{
Status: sch.FailedStatus,
Message: fmt.Sprintf("upload data: %w", err),
}
s.task.Send(message)
return fmt.Errorf("upload data: %w", err) return fmt.Errorf("upload data: %w", err)
} }
if uploadResp.JsonData != "" { if uploadResp.JsonData != "" {
err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, uploadResp.JsonData, -1) err = rtx.Mgr.DB.UploadData().UpdatePackage(rtx.Mgr.DB.DefCtx(), uploadResp.PackageID, uploadResp.JsonData, -1)
if err != nil { if err != nil {
message := sch.TaskMessage{
Status: sch.FailedStatus,
Message: fmt.Sprintf("update package: %w", err),
}
s.task.Send(message)
return fmt.Errorf("update package: %w", err) return fmt.Errorf("update package: %w", err)
} }
} }
@ -100,6 +112,12 @@ func (s *DataUpload) do(rtx jobmgr.JobStateRunContext) error {
} }
message := sch.TaskMessage{
Status: sch.SuccessStatus,
Message: "upload success!",
}
s.task.Send(message)
// 传入存证 // 传入存证
blockChains, err := s.blockChain(objectIDs) blockChains, err := s.blockChain(objectIDs)
if err != nil { if err != nil {

View File

@ -3,6 +3,9 @@ package jobTask
import ( import (
"fmt" "fmt"
"gitlink.org.cn/cloudream/common/pkgs/async" "gitlink.org.cn/cloudream/common/pkgs/async"
//"gitlink.org.cn/cloudream/common/pkgs/async"
//"gitlink.org.cn/cloudream/common/utils/sync2"
"gitlink.org.cn/cloudream/common/pkgs/future" "gitlink.org.cn/cloudream/common/pkgs/future"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
"math/rand" "math/rand"
@ -37,9 +40,13 @@ func (c *JobTask[T]) Receive() future.Future1[T] {
return c.taskChan.Receive() return c.taskChan.Receive()
} }
func (c *JobTask[T]) Send(info any) { func (c *JobTask[T]) Send(info T) {
err := c.taskChan.Send(info)
logger.Info("send http") if err != nil {
logger.Error(err.Error())
return
}
//logger.Info("send http")
} }
func (c *JobTask[T]) Chan() *async.UnboundChannel[T] { func (c *JobTask[T]) Chan() *async.UnboundChannel[T] {

View File

@ -7,6 +7,7 @@ import (
"gitlink.org.cn/JointCloud/pcm-hub/aikit/common/algorithm" "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/algorithm"
"gitlink.org.cn/JointCloud/pcm-hub/aikit/common/dataset" "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/dataset"
"gitlink.org.cn/JointCloud/pcm-hub/aikit/common/model" "gitlink.org.cn/JointCloud/pcm-hub/aikit/common/model"
jobTask "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/task"
"sort" "sort"
"gitlink.org.cn/cloudream/common/pkgs/logger" "gitlink.org.cn/cloudream/common/pkgs/logger"
@ -50,7 +51,7 @@ func (svc *JobSetService) PreScheduler(jobSet schsdk.JobSetInfo) (*jobmod.JobSet
return schScheme, uploadScheme, nil return schScheme, uploadScheme, nil
} }
func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, blockChainToken string) (*schsdk.JobSetID, error) { func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams, blockChainToken string, task *jobTask.JobTask[sch.TaskMessage]) (*schsdk.JobSetID, error) {
logger.Debugf("uploading job") logger.Debugf("uploading job")
// 查询数据库里维护的集群 // 查询数据库里维护的集群
@ -101,7 +102,7 @@ func (svc *JobSetService) Upload(userID cdssdk.UserID, params sch.UploadParams,
jo := job.NewNormalJob(schsdk.NormalJobInfo{}) jo := job.NewNormalJob(schsdk.NormalJobInfo{})
jobs = append(jobs, jobmgr.SubmittingJob{ jobs = append(jobs, jobmgr.SubmittingJob{
Body: jo, Body: jo,
InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken), InitState: state2.NewDataUpload(userID, params.UploadInfo, params.DataType, blockChainToken, task),
}) })
jobSetID := svc.jobMgr.SubmitJobSet(jobs) jobSetID := svc.jobMgr.SubmitJobSet(jobs)
@ -595,7 +596,7 @@ func (svc *JobSetService) DeleteBinding(IDs []int64) error {
return nil return nil
} }
func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDataParam) ([]uploadersdk.BindingDetail, error) { func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDataParam, filters sch.QueryBindingFilters) ([]uploadersdk.BindingDetail, error) {
switch p := param.(type) { switch p := param.(type) {
case *sch.PrivateLevel: case *sch.PrivateLevel:
@ -640,6 +641,15 @@ func (svc *JobSetService) QueryBinding(dataType string, param sch.QueryBindingDa
for _, data := range datas { for _, data := range datas {
var info sch.DataBinding var info sch.DataBinding
// 如果有指定状态,则只展示指定状态的数据
if filters.Status != "" && data.Status != filters.Status {
continue
}
if filters.Name != "" && data.Name != filters.Name {
continue
}
// 只有approved状态的数据才能看到详情 // 只有approved状态的数据才能看到详情
if data.Status == sch.ApprovedStatus { if data.Status == sch.ApprovedStatus {
binding := uploadersdk.Binding{ binding := uploadersdk.Binding{

View File

@ -12,6 +12,7 @@ type Service struct {
jobMgr *jobmgr.Manager jobMgr *jobmgr.Manager
db *db.DB db *db.DB
hubClient *hub.Client hubClient *hub.Client
//JobTask *jobTask.JobTask[string]
} }
func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB, hubClient *hub.Client) (*Service, error) { func NewService(preScheduler prescheduler2.PreScheduler, jobMgr *jobmgr.Manager, db *db.DB, hubClient *hub.Client) (*Service, error) {

View File

@ -1,67 +1,9 @@
package main package main
import ( import (
"fmt" cmd "gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/cmdline"
pcmHubClient "gitlink.org.cn/JointCloud/pcm-hub/client"
pcmHubConfig "gitlink.org.cn/JointCloud/pcm-hub/config"
"gitlink.org.cn/cloudream/common/pkgs/logger"
schglb "gitlink.org.cn/cloudream/scheduler/common/globals"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/db"
"gitlink.org.cn/cloudream/scheduler/common/pkgs/prescheduler2"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/cmdline"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/config"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/manager/jobmgr"
"gitlink.org.cn/cloudream/scheduler/schedulerMiddleware/internal/services"
"os"
) )
func main() { func main() {
err := config.Init() cmd.RootCmd.Execute()
if err != nil {
fmt.Printf("init config failed, err: %s", err.Error())
os.Exit(1)
}
err = logger.Init(&config.Cfg().Logger)
if err != nil {
fmt.Printf("init logger failed, err: %s", err.Error())
os.Exit(1)
}
schglb.InitPCMSchePool(&config.Cfg().PCMScheduler)
schglb.InitUploaderPool(&config.Cfg().Uploader)
schglb.InitBlockChainPool(&config.Cfg().BlockChain)
schglb.InitCloudreamStoragePool(&config.Cfg().CloudreamStorage)
dbSvc, err := db.NewDB(&config.Cfg().DB)
if err != nil {
logger.Fatalf("new db2 failed, err: %s", err.Error())
}
preSchr := prescheduler2.NewDefaultPreScheduler()
nodeSvc := jobmgr.NewNodeService()
jobMgr, err := jobmgr.NewManager(dbSvc, nodeSvc)
if err != nil {
logger.Fatalf("new job manager failed, err: %s", err.Error())
}
hubConfig := &pcmHubConfig.Config{
Platforms: config.Cfg().PCMHub,
}
hubClient, err := pcmHubClient.NewClient(hubConfig)
if err != nil {
logger.Fatalf("new pcm hub client failed, err: %s", err.Error())
}
svc, err := services.NewService(preSchr, jobMgr, dbSvc, hubClient)
if err != nil {
logger.Fatalf("new service failed, err: %s", err.Error())
}
cmds, err := cmdline.NewCommandline(svc)
if err != nil {
logger.Warnf("new command line failed, err: %s", err.Error())
os.Exit(1)
}
cmds.DispatchCommand(os.Args[1:])
} }