315 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			315 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
package server
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"strconv"
 | 
						|
	"sync"
 | 
						|
 | 
						|
	"github.com/gin-gonic/gin"
 | 
						|
	"github.com/pkg/errors"
 | 
						|
	log "github.com/sirupsen/logrus"
 | 
						|
 | 
						|
	"github.com/oceanbase/configserver/ent"
 | 
						|
	"github.com/oceanbase/configserver/ent/obcluster"
 | 
						|
	"github.com/oceanbase/configserver/model"
 | 
						|
)
 | 
						|
 | 
						|
var obRootServiceGetOnce sync.Once
 | 
						|
var obRootServiceGetFunc func(*gin.Context)
 | 
						|
var obRootServicePostOnce sync.Once
 | 
						|
var obRootServicePostFunc func(*gin.Context)
 | 
						|
var obRootServiceDeleteOnce sync.Once
 | 
						|
var obRootServiceDeleteFunc func(*gin.Context)
 | 
						|
var obIdcRegionInfoOnce sync.Once
 | 
						|
var obIdcRegionInfoFunc func(*gin.Context)
 | 
						|
 | 
						|
func getObIdcRegionInfoFunc() func(c *gin.Context) {
 | 
						|
	obIdcRegionInfoOnce.Do(func() {
 | 
						|
		obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
 | 
						|
 | 
						|
	})
 | 
						|
	return obIdcRegionInfoFunc
 | 
						|
}
 | 
						|
 | 
						|
func getObRootServiceGetFunc() func(*gin.Context) {
 | 
						|
	obRootServiceGetOnce.Do(func() {
 | 
						|
		obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
 | 
						|
	})
 | 
						|
	return obRootServiceGetFunc
 | 
						|
}
 | 
						|
 | 
						|
func getObRootServicePostFunc() func(*gin.Context) {
 | 
						|
	obRootServicePostOnce.Do(func() {
 | 
						|
		obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
 | 
						|
	})
 | 
						|
	return obRootServicePostFunc
 | 
						|
}
 | 
						|
 | 
						|
func getObRootServiceDeleteFunc() func(*gin.Context) {
 | 
						|
	obRootServiceDeleteOnce.Do(func() {
 | 
						|
		obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
 | 
						|
	})
 | 
						|
	return obRootServiceDeleteFunc
 | 
						|
}
 | 
						|
 | 
						|
type RootServiceInfoParam struct {
 | 
						|
	ObCluster   string
 | 
						|
	ObClusterId int64
 | 
						|
	Version     int
 | 
						|
}
 | 
						|
 | 
						|
func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
 | 
						|
	var err error
 | 
						|
	name := ""
 | 
						|
	obCluster, obClusterOk := c.GetQuery("ObCluster")
 | 
						|
	obRegion, obRegionOk := c.GetQuery("ObRegion")
 | 
						|
	if obClusterOk {
 | 
						|
		name = obCluster
 | 
						|
	}
 | 
						|
	if obRegionOk {
 | 
						|
		name = obRegion
 | 
						|
	}
 | 
						|
 | 
						|
	if len(name) == 0 {
 | 
						|
		return nil, errors.New("no obcluster or obregion")
 | 
						|
	}
 | 
						|
 | 
						|
	obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
 | 
						|
	obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
 | 
						|
 | 
						|
	var clusterId int64
 | 
						|
	var clusterIdStr string
 | 
						|
	if obClusterIdOk {
 | 
						|
		clusterIdStr = obClusterId
 | 
						|
	}
 | 
						|
	if obRegionIdOk {
 | 
						|
		clusterIdStr = obRegionId
 | 
						|
	}
 | 
						|
	if clusterIdStr != "" {
 | 
						|
		clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Wrap(err, "parse ob cluster id")
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	version := 0
 | 
						|
	versionStr, versionOk := c.GetQuery("version")
 | 
						|
	if versionOk {
 | 
						|
		version, err = strconv.Atoi(versionStr)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Wrap(err, "parse version")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return &RootServiceInfoParam{
 | 
						|
		ObCluster:   name,
 | 
						|
		ObClusterId: clusterId,
 | 
						|
		Version:     version,
 | 
						|
	}, nil
 | 
						|
}
 | 
						|
 | 
						|
func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
 | 
						|
	var primaryCluster *model.ObRootServiceInfo
 | 
						|
	for _, cluster := range clusters {
 | 
						|
		if primaryCluster == nil {
 | 
						|
			primaryCluster = cluster
 | 
						|
		} else {
 | 
						|
			if primaryCluster.Type != "PRIMARY" {
 | 
						|
				if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
 | 
						|
					primaryCluster = cluster
 | 
						|
				}
 | 
						|
			} else {
 | 
						|
				if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
 | 
						|
					primaryCluster = cluster
 | 
						|
				}
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return primaryCluster
 | 
						|
}
 | 
						|
 | 
						|
func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | 
						|
	// return empty idc list
 | 
						|
	param, err := getCommonParam(c)
 | 
						|
	if err != nil {
 | 
						|
		return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
 | 
						|
	}
 | 
						|
 | 
						|
	rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
 | 
						|
	if err != nil {
 | 
						|
		if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
 | 
						|
			return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
 | 
						|
		} else {
 | 
						|
			return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	idcList := make([]*model.IdcRegionInfo, 0, 0)
 | 
						|
	if param.Version < 2 || param.ObClusterId > 0 {
 | 
						|
		primaryCluster := selectPrimaryCluster(rootServiceInfoList)
 | 
						|
		obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
 | 
						|
			Cluster:        primaryCluster.ObCluster,
 | 
						|
			ClusterId:      primaryCluster.ObClusterId,
 | 
						|
			IdcList:        idcList,
 | 
						|
			ReadonlyRsList: "",
 | 
						|
		}
 | 
						|
		return NewSuccessResponse(obClusterIdcRegionInfo)
 | 
						|
	} else {
 | 
						|
		obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
 | 
						|
		for _, cluster := range rootServiceInfoList {
 | 
						|
			obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
 | 
						|
				Cluster:        cluster.ObCluster,
 | 
						|
				ClusterId:      cluster.ObClusterId,
 | 
						|
				IdcList:        idcList,
 | 
						|
				ReadonlyRsList: "",
 | 
						|
			}
 | 
						|
			obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
 | 
						|
		}
 | 
						|
		return NewSuccessResponse(obClusterIdcRegionInfoList)
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
 | 
						|
	var clusters []*ent.ObCluster
 | 
						|
	var err error
 | 
						|
	rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
 | 
						|
	client := GetConfigServer().Client
 | 
						|
 | 
						|
	if obClusterId != 0 {
 | 
						|
		log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
 | 
						|
		clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
 | 
						|
	} else {
 | 
						|
		log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
 | 
						|
		clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
 | 
						|
	}
 | 
						|
	if err != nil {
 | 
						|
		return nil, errors.Wrap(err, "query ob clusters from db")
 | 
						|
	}
 | 
						|
	if len(clusters) == 0 {
 | 
						|
		return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
 | 
						|
	}
 | 
						|
	for _, cluster := range clusters {
 | 
						|
		var rootServiceInfo model.ObRootServiceInfo
 | 
						|
		err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
 | 
						|
		if err != nil {
 | 
						|
			return nil, errors.Wrap(err, "deserialize root service info")
 | 
						|
		}
 | 
						|
		rootServiceInfo.Fill()
 | 
						|
		rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
 | 
						|
	}
 | 
						|
	return rootServiceInfoList, nil
 | 
						|
}
 | 
						|
 | 
						|
func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | 
						|
	var response *ApiResponse
 | 
						|
	param, err := getCommonParam(c)
 | 
						|
	if err != nil {
 | 
						|
		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
 | 
						|
	}
 | 
						|
	rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
 | 
						|
	if err != nil {
 | 
						|
		if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
 | 
						|
			return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
 | 
						|
		} else {
 | 
						|
			return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	if param.Version < 2 || param.ObClusterId > 0 {
 | 
						|
		log.WithContext(ctxlog).Infof("return primary ob cluster")
 | 
						|
		response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
 | 
						|
	} else {
 | 
						|
		log.WithContext(ctxlog).Infof("return all ob clusters")
 | 
						|
		response = NewSuccessResponse(rootServiceInfoList)
 | 
						|
	}
 | 
						|
	return response
 | 
						|
}
 | 
						|
 | 
						|
func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | 
						|
	var response *ApiResponse
 | 
						|
	client := GetConfigServer().Client
 | 
						|
	obRootServiceInfo := new(model.ObRootServiceInfo)
 | 
						|
	err := c.ShouldBindJSON(obRootServiceInfo)
 | 
						|
	if err != nil {
 | 
						|
		return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
 | 
						|
	}
 | 
						|
	obRootServiceInfo.Fill()
 | 
						|
	param, err := getCommonParam(c)
 | 
						|
	if err != nil {
 | 
						|
		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
 | 
						|
	}
 | 
						|
	if len(obRootServiceInfo.ObCluster) == 0 {
 | 
						|
		return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
 | 
						|
	}
 | 
						|
	if param.Version > 1 {
 | 
						|
		if len(obRootServiceInfo.Type) == 0 {
 | 
						|
			return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	rsBytes, err := json.Marshal(obRootServiceInfo)
 | 
						|
	if err != nil {
 | 
						|
		response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
 | 
						|
	} else {
 | 
						|
		rootServiceInfoJson := string(rsBytes)
 | 
						|
		log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
 | 
						|
 | 
						|
		err := client.ObCluster.
 | 
						|
			Create().
 | 
						|
			SetName(obRootServiceInfo.ObCluster).
 | 
						|
			SetObClusterID(obRootServiceInfo.ObClusterId).
 | 
						|
			SetType(obRootServiceInfo.Type).
 | 
						|
			SetRootserviceJSON(rootServiceInfoJson).
 | 
						|
			OnConflict().
 | 
						|
			SetRootserviceJSON(rootServiceInfoJson).
 | 
						|
			Exec(context.Background())
 | 
						|
		if err != nil {
 | 
						|
			response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
 | 
						|
		} else {
 | 
						|
			response = NewSuccessResponse("successful")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return response
 | 
						|
}
 | 
						|
 | 
						|
func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | 
						|
	var response *ApiResponse
 | 
						|
	client := GetConfigServer().Client
 | 
						|
 | 
						|
	param, err := getCommonParam(c)
 | 
						|
	if err != nil {
 | 
						|
		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
 | 
						|
	}
 | 
						|
	if param.Version < 2 {
 | 
						|
		response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
 | 
						|
	} else if param.ObClusterId == 0 {
 | 
						|
		response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
 | 
						|
	} else {
 | 
						|
		affected, err := client.ObCluster.
 | 
						|
			Delete().
 | 
						|
			Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
 | 
						|
			Exec(context.Background())
 | 
						|
		if err != nil {
 | 
						|
			response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
 | 
						|
		} else {
 | 
						|
			log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
 | 
						|
			response = NewSuccessResponse("success")
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return response
 | 
						|
 | 
						|
}
 |