Files
oceanbase/tools/ob-configserver/server/observer_handler.go
2022-05-30 16:44:36 +08:00

315 lines
10 KiB
Go

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
package server
import (
"context"
"encoding/json"
"fmt"
"strconv"
"sync"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/oceanbase/configserver/ent"
"github.com/oceanbase/configserver/ent/obcluster"
"github.com/oceanbase/configserver/model"
)
var obRootServiceGetOnce sync.Once
var obRootServiceGetFunc func(*gin.Context)
var obRootServicePostOnce sync.Once
var obRootServicePostFunc func(*gin.Context)
var obRootServiceDeleteOnce sync.Once
var obRootServiceDeleteFunc func(*gin.Context)
var obIdcRegionInfoOnce sync.Once
var obIdcRegionInfoFunc func(*gin.Context)
func getObIdcRegionInfoFunc() func(c *gin.Context) {
obIdcRegionInfoOnce.Do(func() {
obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
})
return obIdcRegionInfoFunc
}
func getObRootServiceGetFunc() func(*gin.Context) {
obRootServiceGetOnce.Do(func() {
obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
})
return obRootServiceGetFunc
}
func getObRootServicePostFunc() func(*gin.Context) {
obRootServicePostOnce.Do(func() {
obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
})
return obRootServicePostFunc
}
func getObRootServiceDeleteFunc() func(*gin.Context) {
obRootServiceDeleteOnce.Do(func() {
obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
})
return obRootServiceDeleteFunc
}
type RootServiceInfoParam struct {
ObCluster string
ObClusterId int64
Version int
}
func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
var err error
name := ""
obCluster, obClusterOk := c.GetQuery("ObCluster")
obRegion, obRegionOk := c.GetQuery("ObRegion")
if obClusterOk {
name = obCluster
}
if obRegionOk {
name = obRegion
}
if len(name) == 0 {
return nil, errors.New("no obcluster or obregion")
}
obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
var clusterId int64
var clusterIdStr string
if obClusterIdOk {
clusterIdStr = obClusterId
}
if obRegionIdOk {
clusterIdStr = obRegionId
}
if clusterIdStr != "" {
clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
if err != nil {
return nil, errors.Wrap(err, "parse ob cluster id")
}
}
version := 0
versionStr, versionOk := c.GetQuery("version")
if versionOk {
version, err = strconv.Atoi(versionStr)
if err != nil {
return nil, errors.Wrap(err, "parse version")
}
}
return &RootServiceInfoParam{
ObCluster: name,
ObClusterId: clusterId,
Version: version,
}, nil
}
func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
var primaryCluster *model.ObRootServiceInfo
for _, cluster := range clusters {
if primaryCluster == nil {
primaryCluster = cluster
} else {
if primaryCluster.Type != "PRIMARY" {
if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
primaryCluster = cluster
}
} else {
if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
primaryCluster = cluster
}
}
}
}
return primaryCluster
}
func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
// return empty idc list
param, err := getCommonParam(c)
if err != nil {
return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
}
rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
if err != nil {
if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
} else {
return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
}
}
idcList := make([]*model.IdcRegionInfo, 0, 0)
if param.Version < 2 || param.ObClusterId > 0 {
primaryCluster := selectPrimaryCluster(rootServiceInfoList)
obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
Cluster: primaryCluster.ObCluster,
ClusterId: primaryCluster.ObClusterId,
IdcList: idcList,
ReadonlyRsList: "",
}
return NewSuccessResponse(obClusterIdcRegionInfo)
} else {
obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
for _, cluster := range rootServiceInfoList {
obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
Cluster: cluster.ObCluster,
ClusterId: cluster.ObClusterId,
IdcList: idcList,
ReadonlyRsList: "",
}
obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
}
return NewSuccessResponse(obClusterIdcRegionInfoList)
}
}
func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
var clusters []*ent.ObCluster
var err error
rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
client := GetConfigServer().Client
if obClusterId != 0 {
log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
} else {
log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
}
if err != nil {
return nil, errors.Wrap(err, "query ob clusters from db")
}
if len(clusters) == 0 {
return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
}
for _, cluster := range clusters {
var rootServiceInfo model.ObRootServiceInfo
err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
if err != nil {
return nil, errors.Wrap(err, "deserialize root service info")
}
rootServiceInfo.Fill()
rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
}
return rootServiceInfoList, nil
}
func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
var response *ApiResponse
param, err := getCommonParam(c)
if err != nil {
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
}
rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
if err != nil {
if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
} else {
return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
}
}
if param.Version < 2 || param.ObClusterId > 0 {
log.WithContext(ctxlog).Infof("return primary ob cluster")
response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
} else {
log.WithContext(ctxlog).Infof("return all ob clusters")
response = NewSuccessResponse(rootServiceInfoList)
}
return response
}
func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
var response *ApiResponse
client := GetConfigServer().Client
obRootServiceInfo := new(model.ObRootServiceInfo)
err := c.ShouldBindJSON(obRootServiceInfo)
if err != nil {
return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
}
obRootServiceInfo.Fill()
param, err := getCommonParam(c)
if err != nil {
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
}
if len(obRootServiceInfo.ObCluster) == 0 {
return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
}
if param.Version > 1 {
if len(obRootServiceInfo.Type) == 0 {
return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
}
}
rsBytes, err := json.Marshal(obRootServiceInfo)
if err != nil {
response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
} else {
rootServiceInfoJson := string(rsBytes)
log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
err := client.ObCluster.
Create().
SetName(obRootServiceInfo.ObCluster).
SetObClusterID(obRootServiceInfo.ObClusterId).
SetType(obRootServiceInfo.Type).
SetRootserviceJSON(rootServiceInfoJson).
OnConflict().
SetRootserviceJSON(rootServiceInfoJson).
Exec(context.Background())
if err != nil {
response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
} else {
response = NewSuccessResponse("successful")
}
}
return response
}
func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
var response *ApiResponse
client := GetConfigServer().Client
param, err := getCommonParam(c)
if err != nil {
return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
}
if param.Version < 2 {
response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
} else if param.ObClusterId == 0 {
response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
} else {
affected, err := client.ObCluster.
Delete().
Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
Exec(context.Background())
if err != nil {
response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
} else {
log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
response = NewSuccessResponse("success")
}
}
return response
}