add ob configserver (#943)
This commit is contained in:
314
tools/ob-configserver/server/observer_handler.go
Normal file
314
tools/ob-configserver/server/observer_handler.go
Normal file
@ -0,0 +1,314 @@
|
||||
/**
|
||||
* Copyright (c) 2021 OceanBase
|
||||
* OceanBase CE is licensed under Mulan PubL v2.
|
||||
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
* You may obtain a copy of Mulan PubL v2 at:
|
||||
* http://license.coscl.org.cn/MulanPubL-2.0
|
||||
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/pkg/errors"
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/oceanbase/configserver/ent"
|
||||
"github.com/oceanbase/configserver/ent/obcluster"
|
||||
"github.com/oceanbase/configserver/model"
|
||||
)
|
||||
|
||||
var obRootServiceGetOnce sync.Once
|
||||
var obRootServiceGetFunc func(*gin.Context)
|
||||
var obRootServicePostOnce sync.Once
|
||||
var obRootServicePostFunc func(*gin.Context)
|
||||
var obRootServiceDeleteOnce sync.Once
|
||||
var obRootServiceDeleteFunc func(*gin.Context)
|
||||
var obIdcRegionInfoOnce sync.Once
|
||||
var obIdcRegionInfoFunc func(*gin.Context)
|
||||
|
||||
func getObIdcRegionInfoFunc() func(c *gin.Context) {
|
||||
obIdcRegionInfoOnce.Do(func() {
|
||||
obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
|
||||
|
||||
})
|
||||
return obIdcRegionInfoFunc
|
||||
}
|
||||
|
||||
func getObRootServiceGetFunc() func(*gin.Context) {
|
||||
obRootServiceGetOnce.Do(func() {
|
||||
obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
|
||||
})
|
||||
return obRootServiceGetFunc
|
||||
}
|
||||
|
||||
func getObRootServicePostFunc() func(*gin.Context) {
|
||||
obRootServicePostOnce.Do(func() {
|
||||
obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
|
||||
})
|
||||
return obRootServicePostFunc
|
||||
}
|
||||
|
||||
func getObRootServiceDeleteFunc() func(*gin.Context) {
|
||||
obRootServiceDeleteOnce.Do(func() {
|
||||
obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
|
||||
})
|
||||
return obRootServiceDeleteFunc
|
||||
}
|
||||
|
||||
type RootServiceInfoParam struct {
|
||||
ObCluster string
|
||||
ObClusterId int64
|
||||
Version int
|
||||
}
|
||||
|
||||
func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
|
||||
var err error
|
||||
name := ""
|
||||
obCluster, obClusterOk := c.GetQuery("ObCluster")
|
||||
obRegion, obRegionOk := c.GetQuery("ObRegion")
|
||||
if obClusterOk {
|
||||
name = obCluster
|
||||
}
|
||||
if obRegionOk {
|
||||
name = obRegion
|
||||
}
|
||||
|
||||
if len(name) == 0 {
|
||||
return nil, errors.New("no obcluster or obregion")
|
||||
}
|
||||
|
||||
obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
|
||||
obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
|
||||
|
||||
var clusterId int64
|
||||
var clusterIdStr string
|
||||
if obClusterIdOk {
|
||||
clusterIdStr = obClusterId
|
||||
}
|
||||
if obRegionIdOk {
|
||||
clusterIdStr = obRegionId
|
||||
}
|
||||
if clusterIdStr != "" {
|
||||
clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "parse ob cluster id")
|
||||
}
|
||||
}
|
||||
|
||||
version := 0
|
||||
versionStr, versionOk := c.GetQuery("version")
|
||||
if versionOk {
|
||||
version, err = strconv.Atoi(versionStr)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "parse version")
|
||||
}
|
||||
}
|
||||
return &RootServiceInfoParam{
|
||||
ObCluster: name,
|
||||
ObClusterId: clusterId,
|
||||
Version: version,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
|
||||
var primaryCluster *model.ObRootServiceInfo
|
||||
for _, cluster := range clusters {
|
||||
if primaryCluster == nil {
|
||||
primaryCluster = cluster
|
||||
} else {
|
||||
if primaryCluster.Type != "PRIMARY" {
|
||||
if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
|
||||
primaryCluster = cluster
|
||||
}
|
||||
} else {
|
||||
if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
|
||||
primaryCluster = cluster
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return primaryCluster
|
||||
}
|
||||
|
||||
func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
||||
// return empty idc list
|
||||
param, err := getCommonParam(c)
|
||||
if err != nil {
|
||||
return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
|
||||
}
|
||||
|
||||
rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
|
||||
if err != nil {
|
||||
if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
|
||||
return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
|
||||
} else {
|
||||
return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
|
||||
}
|
||||
}
|
||||
|
||||
idcList := make([]*model.IdcRegionInfo, 0, 0)
|
||||
if param.Version < 2 || param.ObClusterId > 0 {
|
||||
primaryCluster := selectPrimaryCluster(rootServiceInfoList)
|
||||
obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
|
||||
Cluster: primaryCluster.ObCluster,
|
||||
ClusterId: primaryCluster.ObClusterId,
|
||||
IdcList: idcList,
|
||||
ReadonlyRsList: "",
|
||||
}
|
||||
return NewSuccessResponse(obClusterIdcRegionInfo)
|
||||
} else {
|
||||
obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
|
||||
for _, cluster := range rootServiceInfoList {
|
||||
obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
|
||||
Cluster: cluster.ObCluster,
|
||||
ClusterId: cluster.ObClusterId,
|
||||
IdcList: idcList,
|
||||
ReadonlyRsList: "",
|
||||
}
|
||||
obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
|
||||
}
|
||||
return NewSuccessResponse(obClusterIdcRegionInfoList)
|
||||
}
|
||||
}
|
||||
|
||||
func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
|
||||
var clusters []*ent.ObCluster
|
||||
var err error
|
||||
rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
|
||||
client := GetConfigServer().Client
|
||||
|
||||
if obClusterId != 0 {
|
||||
log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
|
||||
clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
|
||||
} else {
|
||||
log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
|
||||
clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "query ob clusters from db")
|
||||
}
|
||||
if len(clusters) == 0 {
|
||||
return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
|
||||
}
|
||||
for _, cluster := range clusters {
|
||||
var rootServiceInfo model.ObRootServiceInfo
|
||||
err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "deserialize root service info")
|
||||
}
|
||||
rootServiceInfo.Fill()
|
||||
rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
|
||||
}
|
||||
return rootServiceInfoList, nil
|
||||
}
|
||||
|
||||
func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
||||
var response *ApiResponse
|
||||
param, err := getCommonParam(c)
|
||||
if err != nil {
|
||||
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
|
||||
}
|
||||
rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
|
||||
if err != nil {
|
||||
if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
|
||||
return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
|
||||
} else {
|
||||
return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
|
||||
}
|
||||
}
|
||||
|
||||
if param.Version < 2 || param.ObClusterId > 0 {
|
||||
log.WithContext(ctxlog).Infof("return primary ob cluster")
|
||||
response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
|
||||
} else {
|
||||
log.WithContext(ctxlog).Infof("return all ob clusters")
|
||||
response = NewSuccessResponse(rootServiceInfoList)
|
||||
}
|
||||
return response
|
||||
}
|
||||
|
||||
func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
||||
var response *ApiResponse
|
||||
client := GetConfigServer().Client
|
||||
obRootServiceInfo := new(model.ObRootServiceInfo)
|
||||
err := c.ShouldBindJSON(obRootServiceInfo)
|
||||
if err != nil {
|
||||
return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
|
||||
}
|
||||
obRootServiceInfo.Fill()
|
||||
param, err := getCommonParam(c)
|
||||
if err != nil {
|
||||
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
|
||||
}
|
||||
if len(obRootServiceInfo.ObCluster) == 0 {
|
||||
return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
|
||||
}
|
||||
if param.Version > 1 {
|
||||
if len(obRootServiceInfo.Type) == 0 {
|
||||
return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
|
||||
}
|
||||
}
|
||||
|
||||
rsBytes, err := json.Marshal(obRootServiceInfo)
|
||||
if err != nil {
|
||||
response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
|
||||
} else {
|
||||
rootServiceInfoJson := string(rsBytes)
|
||||
log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
|
||||
|
||||
err := client.ObCluster.
|
||||
Create().
|
||||
SetName(obRootServiceInfo.ObCluster).
|
||||
SetObClusterID(obRootServiceInfo.ObClusterId).
|
||||
SetType(obRootServiceInfo.Type).
|
||||
SetRootserviceJSON(rootServiceInfoJson).
|
||||
OnConflict().
|
||||
SetRootserviceJSON(rootServiceInfoJson).
|
||||
Exec(context.Background())
|
||||
if err != nil {
|
||||
response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
|
||||
} else {
|
||||
response = NewSuccessResponse("successful")
|
||||
}
|
||||
}
|
||||
return response
|
||||
}
|
||||
|
||||
func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
|
||||
var response *ApiResponse
|
||||
client := GetConfigServer().Client
|
||||
|
||||
param, err := getCommonParam(c)
|
||||
if err != nil {
|
||||
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
|
||||
}
|
||||
if param.Version < 2 {
|
||||
response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
|
||||
} else if param.ObClusterId == 0 {
|
||||
response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
|
||||
} else {
|
||||
affected, err := client.ObCluster.
|
||||
Delete().
|
||||
Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
|
||||
Exec(context.Background())
|
||||
if err != nil {
|
||||
response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
|
||||
} else {
|
||||
log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
|
||||
response = NewSuccessResponse("success")
|
||||
}
|
||||
}
|
||||
return response
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user