228 lines
6.5 KiB
Go
228 lines
6.5 KiB
Go
package nacos
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/JCCE-nudt/zero-contrib/zrpc/registry/nacos"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/clients"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/clients/nacos_client"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_cache"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client/naming_proxy"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/common/http_agent"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/common/nacos_server"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/common/security"
|
||
"github.com/nacos-group/nacos-sdk-go/v2/vo"
|
||
"github.com/zeromicro/go-zero/core/logx"
|
||
"github.com/zeromicro/go-zero/rest"
|
||
"github.com/zeromicro/go-zero/zrpc"
|
||
nacosVo "gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/helper/nacos/vo"
|
||
"gitlink.org.cn/jcce-pcm/pcm-coordinator/pkg/utils"
|
||
"net/http"
|
||
"sync"
|
||
)
|
||
|
||
type (
|
||
BootstrapConfig struct {
|
||
NacosConfig NacosConfig
|
||
}
|
||
|
||
ListenConfig func(data string)
|
||
|
||
NacosServerConfig struct {
|
||
IpAddr string
|
||
Port uint64
|
||
}
|
||
|
||
NacosClientConfig struct {
|
||
NamespaceId string
|
||
TimeoutMs uint64
|
||
NotLoadCacheAtStart bool
|
||
LogDir string
|
||
CacheDir string
|
||
LogLevel string
|
||
}
|
||
|
||
NacosConfig struct {
|
||
ServerConfigs []NacosServerConfig
|
||
ClientConfig NacosClientConfig
|
||
DataId string
|
||
Group string
|
||
}
|
||
)
|
||
|
||
// NamingClient ...
|
||
type NamingClient struct {
|
||
nacos_client.INacosClient
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
serviceProxy naming_proxy.INamingProxy
|
||
serviceInfoHolder *naming_cache.ServiceInfoHolder
|
||
clientConfig constant.ClientConfig
|
||
}
|
||
|
||
func (n *NacosConfig) Discovery(c *zrpc.RpcServerConf) {
|
||
sc, cc := n.buildConfig()
|
||
opts := nacos.NewNacosConfig(c.Name, c.ListenOn, sc, &cc)
|
||
opts.Group = n.Group
|
||
err := nacos.RegisterService(opts)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
}
|
||
|
||
func (n *NacosConfig) DiscoveryRest(c *rest.RestConf) {
|
||
sc, cc := n.buildConfig()
|
||
opts := nacos.NewNacosConfig(c.Name, fmt.Sprintf("%s:%d", c.Host, c.Port), sc, &cc)
|
||
err := nacos.RegisterService(opts)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
}
|
||
|
||
func (n *NacosConfig) InitConfig(listenConfigCallback ListenConfig) string {
|
||
//nacos server
|
||
sc, cc := n.buildConfig()
|
||
|
||
pa := vo.NacosClientParam{
|
||
ClientConfig: &cc,
|
||
ServerConfigs: sc,
|
||
}
|
||
configClient, err := clients.NewConfigClient(pa)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
//获取配置中心内容
|
||
content, err := configClient.GetConfig(vo.ConfigParam{
|
||
DataId: n.DataId,
|
||
Group: n.Group,
|
||
})
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
//设置配置监听
|
||
if err = configClient.ListenConfig(vo.ConfigParam{
|
||
DataId: n.DataId,
|
||
Group: n.Group,
|
||
OnChange: func(namespace, group, dataId, data string) {
|
||
//配置文件产生变化就会触发
|
||
if len(data) == 0 {
|
||
logx.Errorf("listen nacos data nil error , namespace : %s,group : %s , dataId : %s , data : %s", namespace, group, dataId, data)
|
||
return
|
||
}
|
||
listenConfigCallback(data)
|
||
},
|
||
}); err != nil {
|
||
panic(err)
|
||
}
|
||
|
||
if len(content) == 0 {
|
||
panic("read nacos nacos content err , content is nil")
|
||
}
|
||
|
||
return content
|
||
}
|
||
|
||
func (n *NacosConfig) buildConfig() ([]constant.ServerConfig, constant.ClientConfig) {
|
||
var sc []constant.ServerConfig
|
||
if len(n.ServerConfigs) == 0 {
|
||
panic("nacos server no set")
|
||
}
|
||
for _, serveConfig := range n.ServerConfigs {
|
||
sc = append(sc, constant.ServerConfig{
|
||
Port: serveConfig.Port,
|
||
IpAddr: serveConfig.IpAddr,
|
||
},
|
||
)
|
||
}
|
||
|
||
//nacos client
|
||
cc := constant.ClientConfig{
|
||
NamespaceId: n.ClientConfig.NamespaceId,
|
||
TimeoutMs: n.ClientConfig.TimeoutMs,
|
||
NotLoadCacheAtStart: n.ClientConfig.NotLoadCacheAtStart,
|
||
LogDir: n.ClientConfig.LogDir,
|
||
CacheDir: n.ClientConfig.CacheDir,
|
||
LogLevel: n.ClientConfig.LogLevel,
|
||
}
|
||
return sc, cc
|
||
}
|
||
|
||
type NacosServer struct {
|
||
sync.RWMutex
|
||
securityLogin security.AuthClient
|
||
serverList []constant.ServerConfig
|
||
httpAgent http_agent.IHttpAgent
|
||
timeoutMs uint64
|
||
endpoint string
|
||
lastSrvRefTime int64
|
||
vipSrvRefInterMills int64
|
||
contextPath string
|
||
currentIndex int32
|
||
ServerSrcChangeSignal chan struct{}
|
||
}
|
||
|
||
// GetAllServicesInfo Get all Services
|
||
func (n *NacosConfig) GetAllServicesInfo() (nacosVo.NacosServiceList, error) {
|
||
nacosServiceList := nacosVo.NacosServiceList{}
|
||
api := constant.SERVICE_BASE_PATH + "/catalog/services"
|
||
nacosServer, err := nacos_server.NewNacosServer(context.Background(),
|
||
[]constant.ServerConfig{*constant.NewServerConfig(n.ServerConfigs[0].IpAddr, n.ServerConfigs[0].Port)},
|
||
constant.ClientConfig{},
|
||
&http_agent.HttpAgent{},
|
||
1000,
|
||
"")
|
||
if err != nil {
|
||
return nacosServiceList, err
|
||
}
|
||
params := map[string]string{}
|
||
params["namespaceId"] = n.ClientConfig.NamespaceId
|
||
params["groupName"] = ""
|
||
params["pageNo"] = "1"
|
||
params["pageSize"] = "10000"
|
||
result, err := nacosServer.ReqApi(api, params, http.MethodGet, constant.ClientConfig{})
|
||
if err != nil {
|
||
logx.Errorf("Failed to get all services ,error: <%+v>, namespace : <%s> ", err, n.ClientConfig.NamespaceId)
|
||
return nacosServiceList, err
|
||
}
|
||
err1 := json.Unmarshal([]byte(result), &nacosServiceList)
|
||
if err1 != nil {
|
||
logx.Errorf("Conversion failed ,error: %+v, str: %s", err1, result)
|
||
return nacosServiceList, err
|
||
}
|
||
return nacosServiceList, err
|
||
}
|
||
|
||
// GetAllGroupName Get all GroupName
|
||
func (n *NacosConfig) GetAllGroupName() (nacosGroupList nacosVo.NacosGroupList, err error) {
|
||
nacosServiceList := nacosVo.NacosServiceList{}
|
||
api := constant.SERVICE_BASE_PATH + "/catalog/services"
|
||
nacosServer, err := nacos_server.NewNacosServer(context.Background(),
|
||
[]constant.ServerConfig{*constant.NewServerConfig(n.ServerConfigs[0].IpAddr, n.ServerConfigs[0].Port)},
|
||
constant.ClientConfig{},
|
||
&http_agent.HttpAgent{},
|
||
1000,
|
||
"")
|
||
if err != nil {
|
||
return nacosGroupList, err
|
||
}
|
||
params := map[string]string{}
|
||
params["namespaceId"] = "test"
|
||
params["groupName"] = ""
|
||
params["pageNo"] = "1"
|
||
params["pageSize"] = "10000"
|
||
result, err := nacosServer.ReqApi(api, params, http.MethodGet, constant.ClientConfig{})
|
||
err1 := json.Unmarshal([]byte(result), &nacosServiceList)
|
||
if err1 != nil {
|
||
logx.Errorf("Conversion failed ,error: %+v, str: %s", err1, result)
|
||
return nacosGroupList, err1
|
||
}
|
||
for _, v := range nacosServiceList.ServiceList {
|
||
nacosGroupList.GroupName = append(nacosGroupList.GroupName, v.GroupName)
|
||
}
|
||
nacosGroupList.GroupName = utils.RemoveDuplication_map(nacosGroupList.GroupName)
|
||
|
||
return nacosGroupList, err
|
||
}
|