315 lines
10 KiB
Go
315 lines
10 KiB
Go
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
package server
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/pkg/errors"
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
"github.com/oceanbase/configserver/ent"
|
|
"github.com/oceanbase/configserver/ent/obcluster"
|
|
"github.com/oceanbase/configserver/model"
|
|
)
|
|
|
|
var obRootServiceGetOnce sync.Once
|
|
var obRootServiceGetFunc func(*gin.Context)
|
|
var obRootServicePostOnce sync.Once
|
|
var obRootServicePostFunc func(*gin.Context)
|
|
var obRootServiceDeleteOnce sync.Once
|
|
var obRootServiceDeleteFunc func(*gin.Context)
|
|
var obIdcRegionInfoOnce sync.Once
|
|
var obIdcRegionInfoFunc func(*gin.Context)
|
|
|
|
func getObIdcRegionInfoFunc() func(c *gin.Context) {
|
|
obIdcRegionInfoOnce.Do(func() {
|
|
obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
|
|
|
|
})
|
|
return obIdcRegionInfoFunc
|
|
}
|
|
|
|
func getObRootServiceGetFunc() func(*gin.Context) {
|
|
obRootServiceGetOnce.Do(func() {
|
|
obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
|
|
})
|
|
return obRootServiceGetFunc
|
|
}
|
|
|
|
func getObRootServicePostFunc() func(*gin.Context) {
|
|
obRootServicePostOnce.Do(func() {
|
|
obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
|
|
})
|
|
return obRootServicePostFunc
|
|
}
|
|
|
|
func getObRootServiceDeleteFunc() func(*gin.Context) {
|
|
obRootServiceDeleteOnce.Do(func() {
|
|
obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
|
|
})
|
|
return obRootServiceDeleteFunc
|
|
}
|
|
|
|
type RootServiceInfoParam struct {
|
|
ObCluster string
|
|
ObClusterId int64
|
|
Version int
|
|
}
|
|
|
|
func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
|
|
var err error
|
|
name := ""
|
|
obCluster, obClusterOk := c.GetQuery("ObCluster")
|
|
obRegion, obRegionOk := c.GetQuery("ObRegion")
|
|
if obClusterOk {
|
|
name = obCluster
|
|
}
|
|
if obRegionOk {
|
|
name = obRegion
|
|
}
|
|
|
|
if len(name) == 0 {
|
|
return nil, errors.New("no obcluster or obregion")
|
|
}
|
|
|
|
obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
|
|
obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
|
|
|
|
var clusterId int64
|
|
var clusterIdStr string
|
|
if obClusterIdOk {
|
|
clusterIdStr = obClusterId
|
|
}
|
|
if obRegionIdOk {
|
|
clusterIdStr = obRegionId
|
|
}
|
|
if clusterIdStr != "" {
|
|
clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "parse ob cluster id")
|
|
}
|
|
}
|
|
|
|
version := 0
|
|
versionStr, versionOk := c.GetQuery("version")
|
|
if versionOk {
|
|
version, err = strconv.Atoi(versionStr)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "parse version")
|
|
}
|
|
}
|
|
return &RootServiceInfoParam{
|
|
ObCluster: name,
|
|
ObClusterId: clusterId,
|
|
Version: version,
|
|
}, nil
|
|
}
|
|
|
|
func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
|
|
var primaryCluster *model.ObRootServiceInfo
|
|
for _, cluster := range clusters {
|
|
if primaryCluster == nil {
|
|
primaryCluster = cluster
|
|
} else {
|
|
if primaryCluster.Type != "PRIMARY" {
|
|
if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
|
|
primaryCluster = cluster
|
|
}
|
|
} else {
|
|
if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
|
|
primaryCluster = cluster
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return primaryCluster
|
|
}
|
|
|
|
func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
|
// return empty idc list
|
|
param, err := getCommonParam(c)
|
|
if err != nil {
|
|
return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
|
|
}
|
|
|
|
rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
|
|
if err != nil {
|
|
if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
|
|
return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
|
|
} else {
|
|
return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
|
|
}
|
|
}
|
|
|
|
idcList := make([]*model.IdcRegionInfo, 0, 0)
|
|
if param.Version < 2 || param.ObClusterId > 0 {
|
|
primaryCluster := selectPrimaryCluster(rootServiceInfoList)
|
|
obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
|
|
Cluster: primaryCluster.ObCluster,
|
|
ClusterId: primaryCluster.ObClusterId,
|
|
IdcList: idcList,
|
|
ReadonlyRsList: "",
|
|
}
|
|
return NewSuccessResponse(obClusterIdcRegionInfo)
|
|
} else {
|
|
obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
|
|
for _, cluster := range rootServiceInfoList {
|
|
obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
|
|
Cluster: cluster.ObCluster,
|
|
ClusterId: cluster.ObClusterId,
|
|
IdcList: idcList,
|
|
ReadonlyRsList: "",
|
|
}
|
|
obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
|
|
}
|
|
return NewSuccessResponse(obClusterIdcRegionInfoList)
|
|
}
|
|
}
|
|
|
|
func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
|
|
var clusters []*ent.ObCluster
|
|
var err error
|
|
rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
|
|
client := GetConfigServer().Client
|
|
|
|
if obClusterId != 0 {
|
|
log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
|
|
clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
|
|
} else {
|
|
log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
|
|
clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
|
|
}
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "query ob clusters from db")
|
|
}
|
|
if len(clusters) == 0 {
|
|
return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
|
|
}
|
|
for _, cluster := range clusters {
|
|
var rootServiceInfo model.ObRootServiceInfo
|
|
err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "deserialize root service info")
|
|
}
|
|
rootServiceInfo.Fill()
|
|
rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
|
|
}
|
|
return rootServiceInfoList, nil
|
|
}
|
|
|
|
func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
|
var response *ApiResponse
|
|
param, err := getCommonParam(c)
|
|
if err != nil {
|
|
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
|
|
}
|
|
rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
|
|
if err != nil {
|
|
if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
|
|
return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
|
|
} else {
|
|
return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
|
|
}
|
|
}
|
|
|
|
if param.Version < 2 || param.ObClusterId > 0 {
|
|
log.WithContext(ctxlog).Infof("return primary ob cluster")
|
|
response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
|
|
} else {
|
|
log.WithContext(ctxlog).Infof("return all ob clusters")
|
|
response = NewSuccessResponse(rootServiceInfoList)
|
|
}
|
|
return response
|
|
}
|
|
|
|
func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
|
var response *ApiResponse
|
|
client := GetConfigServer().Client
|
|
obRootServiceInfo := new(model.ObRootServiceInfo)
|
|
err := c.ShouldBindJSON(obRootServiceInfo)
|
|
if err != nil {
|
|
return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
|
|
}
|
|
obRootServiceInfo.Fill()
|
|
param, err := getCommonParam(c)
|
|
if err != nil {
|
|
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
|
|
}
|
|
if len(obRootServiceInfo.ObCluster) == 0 {
|
|
return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
|
|
}
|
|
if param.Version > 1 {
|
|
if len(obRootServiceInfo.Type) == 0 {
|
|
return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
|
|
}
|
|
}
|
|
|
|
rsBytes, err := json.Marshal(obRootServiceInfo)
|
|
if err != nil {
|
|
response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
|
|
} else {
|
|
rootServiceInfoJson := string(rsBytes)
|
|
log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
|
|
|
|
err := client.ObCluster.
|
|
Create().
|
|
SetName(obRootServiceInfo.ObCluster).
|
|
SetObClusterID(obRootServiceInfo.ObClusterId).
|
|
SetType(obRootServiceInfo.Type).
|
|
SetRootserviceJSON(rootServiceInfoJson).
|
|
OnConflict().
|
|
SetRootserviceJSON(rootServiceInfoJson).
|
|
Exec(context.Background())
|
|
if err != nil {
|
|
response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
|
|
} else {
|
|
response = NewSuccessResponse("successful")
|
|
}
|
|
}
|
|
return response
|
|
}
|
|
|
|
func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
|
var response *ApiResponse
|
|
client := GetConfigServer().Client
|
|
|
|
param, err := getCommonParam(c)
|
|
if err != nil {
|
|
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
|
|
}
|
|
if param.Version < 2 {
|
|
response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
|
|
} else if param.ObClusterId == 0 {
|
|
response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
|
|
} else {
|
|
affected, err := client.ObCluster.
|
|
Delete().
|
|
Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
|
|
Exec(context.Background())
|
|
if err != nil {
|
|
response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
|
|
} else {
|
|
log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
|
|
response = NewSuccessResponse("success")
|
|
}
|
|
}
|
|
return response
|
|
|
|
}
|