315 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			315 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| /**
 | |
|  * Copyright (c) 2021 OceanBase
 | |
|  * OceanBase CE is licensed under Mulan PubL v2.
 | |
|  * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | |
|  * You may obtain a copy of Mulan PubL v2 at:
 | |
|  *          http://license.coscl.org.cn/MulanPubL-2.0
 | |
|  * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | |
|  * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | |
|  * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | |
|  * See the Mulan PubL v2 for more details.
 | |
|  */
 | |
| 
 | |
| package server
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/gin-gonic/gin"
 | |
| 	"github.com/pkg/errors"
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| 
 | |
| 	"github.com/oceanbase/configserver/ent"
 | |
| 	"github.com/oceanbase/configserver/ent/obcluster"
 | |
| 	"github.com/oceanbase/configserver/model"
 | |
| )
 | |
| 
 | |
| var obRootServiceGetOnce sync.Once
 | |
| var obRootServiceGetFunc func(*gin.Context)
 | |
| var obRootServicePostOnce sync.Once
 | |
| var obRootServicePostFunc func(*gin.Context)
 | |
| var obRootServiceDeleteOnce sync.Once
 | |
| var obRootServiceDeleteFunc func(*gin.Context)
 | |
| var obIdcRegionInfoOnce sync.Once
 | |
| var obIdcRegionInfoFunc func(*gin.Context)
 | |
| 
 | |
| func getObIdcRegionInfoFunc() func(c *gin.Context) {
 | |
| 	obIdcRegionInfoOnce.Do(func() {
 | |
| 		obIdcRegionInfoFunc = handlerFunctionWrapper(getObIdcRegionInfo)
 | |
| 
 | |
| 	})
 | |
| 	return obIdcRegionInfoFunc
 | |
| }
 | |
| 
 | |
| func getObRootServiceGetFunc() func(*gin.Context) {
 | |
| 	obRootServiceGetOnce.Do(func() {
 | |
| 		obRootServiceGetFunc = handlerFunctionWrapper(getObRootServiceInfo)
 | |
| 	})
 | |
| 	return obRootServiceGetFunc
 | |
| }
 | |
| 
 | |
| func getObRootServicePostFunc() func(*gin.Context) {
 | |
| 	obRootServicePostOnce.Do(func() {
 | |
| 		obRootServicePostFunc = handlerFunctionWrapper(createOrUpdateObRootServiceInfo)
 | |
| 	})
 | |
| 	return obRootServicePostFunc
 | |
| }
 | |
| 
 | |
| func getObRootServiceDeleteFunc() func(*gin.Context) {
 | |
| 	obRootServiceDeleteOnce.Do(func() {
 | |
| 		obRootServiceDeleteFunc = handlerFunctionWrapper(deleteObRootServiceInfo)
 | |
| 	})
 | |
| 	return obRootServiceDeleteFunc
 | |
| }
 | |
| 
 | |
| type RootServiceInfoParam struct {
 | |
| 	ObCluster   string
 | |
| 	ObClusterId int64
 | |
| 	Version     int
 | |
| }
 | |
| 
 | |
| func getCommonParam(c *gin.Context) (*RootServiceInfoParam, error) {
 | |
| 	var err error
 | |
| 	name := ""
 | |
| 	obCluster, obClusterOk := c.GetQuery("ObCluster")
 | |
| 	obRegion, obRegionOk := c.GetQuery("ObRegion")
 | |
| 	if obClusterOk {
 | |
| 		name = obCluster
 | |
| 	}
 | |
| 	if obRegionOk {
 | |
| 		name = obRegion
 | |
| 	}
 | |
| 
 | |
| 	if len(name) == 0 {
 | |
| 		return nil, errors.New("no obcluster or obregion")
 | |
| 	}
 | |
| 
 | |
| 	obClusterId, obClusterIdOk := c.GetQuery("ObClusterId")
 | |
| 	obRegionId, obRegionIdOk := c.GetQuery("ObRegionId")
 | |
| 
 | |
| 	var clusterId int64
 | |
| 	var clusterIdStr string
 | |
| 	if obClusterIdOk {
 | |
| 		clusterIdStr = obClusterId
 | |
| 	}
 | |
| 	if obRegionIdOk {
 | |
| 		clusterIdStr = obRegionId
 | |
| 	}
 | |
| 	if clusterIdStr != "" {
 | |
| 		clusterId, err = strconv.ParseInt(clusterIdStr, 10, 64)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Wrap(err, "parse ob cluster id")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	version := 0
 | |
| 	versionStr, versionOk := c.GetQuery("version")
 | |
| 	if versionOk {
 | |
| 		version, err = strconv.Atoi(versionStr)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Wrap(err, "parse version")
 | |
| 		}
 | |
| 	}
 | |
| 	return &RootServiceInfoParam{
 | |
| 		ObCluster:   name,
 | |
| 		ObClusterId: clusterId,
 | |
| 		Version:     version,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func selectPrimaryCluster(clusters []*model.ObRootServiceInfo) *model.ObRootServiceInfo {
 | |
| 	var primaryCluster *model.ObRootServiceInfo
 | |
| 	for _, cluster := range clusters {
 | |
| 		if primaryCluster == nil {
 | |
| 			primaryCluster = cluster
 | |
| 		} else {
 | |
| 			if primaryCluster.Type != "PRIMARY" {
 | |
| 				if cluster.Type == "PRIMARY" || cluster.TimeStamp > primaryCluster.TimeStamp {
 | |
| 					primaryCluster = cluster
 | |
| 				}
 | |
| 			} else {
 | |
| 				if cluster.Type == "PRIMARY" && cluster.TimeStamp > primaryCluster.TimeStamp {
 | |
| 					primaryCluster = cluster
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	return primaryCluster
 | |
| }
 | |
| 
 | |
| func getObIdcRegionInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | |
| 	// return empty idc list
 | |
| 	param, err := getCommonParam(c)
 | |
| 	if err != nil {
 | |
| 		return NewErrorResponse(errors.Wrap(err, "parse ob idc region info query parameter"))
 | |
| 	}
 | |
| 
 | |
| 	rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
 | |
| 	if err != nil {
 | |
| 		if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
 | |
| 			return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
 | |
| 		} else {
 | |
| 			return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	idcList := make([]*model.IdcRegionInfo, 0, 0)
 | |
| 	if param.Version < 2 || param.ObClusterId > 0 {
 | |
| 		primaryCluster := selectPrimaryCluster(rootServiceInfoList)
 | |
| 		obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
 | |
| 			Cluster:        primaryCluster.ObCluster,
 | |
| 			ClusterId:      primaryCluster.ObClusterId,
 | |
| 			IdcList:        idcList,
 | |
| 			ReadonlyRsList: "",
 | |
| 		}
 | |
| 		return NewSuccessResponse(obClusterIdcRegionInfo)
 | |
| 	} else {
 | |
| 		obClusterIdcRegionInfoList := make([]*model.ObClusterIdcRegionInfo, 0, 4)
 | |
| 		for _, cluster := range rootServiceInfoList {
 | |
| 			obClusterIdcRegionInfo := &model.ObClusterIdcRegionInfo{
 | |
| 				Cluster:        cluster.ObCluster,
 | |
| 				ClusterId:      cluster.ObClusterId,
 | |
| 				IdcList:        idcList,
 | |
| 				ReadonlyRsList: "",
 | |
| 			}
 | |
| 			obClusterIdcRegionInfoList = append(obClusterIdcRegionInfoList, obClusterIdcRegionInfo)
 | |
| 		}
 | |
| 		return NewSuccessResponse(obClusterIdcRegionInfoList)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func getRootServiceInfoList(ctxlog context.Context, obCluster string, obClusterId int64) ([]*model.ObRootServiceInfo, error) {
 | |
| 	var clusters []*ent.ObCluster
 | |
| 	var err error
 | |
| 	rootServiceInfoList := make([]*model.ObRootServiceInfo, 0, 4)
 | |
| 	client := GetConfigServer().Client
 | |
| 
 | |
| 	if obClusterId != 0 {
 | |
| 		log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s and obcluster_id %d", obCluster, obClusterId)
 | |
| 		clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster), obcluster.ObClusterID(obClusterId)).All(context.Background())
 | |
| 	} else {
 | |
| 		log.WithContext(ctxlog).Infof("query ob clusters with obcluster %s", obCluster)
 | |
| 		clusters, err = client.ObCluster.Query().Where(obcluster.Name(obCluster)).All(context.Background())
 | |
| 	}
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Wrap(err, "query ob clusters from db")
 | |
| 	}
 | |
| 	if len(clusters) == 0 {
 | |
| 		return rootServiceInfoList, errors.New(fmt.Sprintf("no root service info found with obcluster %s, obcluster id %d", obCluster, obClusterId))
 | |
| 	}
 | |
| 	for _, cluster := range clusters {
 | |
| 		var rootServiceInfo model.ObRootServiceInfo
 | |
| 		err = json.Unmarshal([]byte(cluster.RootserviceJSON), &rootServiceInfo)
 | |
| 		if err != nil {
 | |
| 			return nil, errors.Wrap(err, "deserialize root service info")
 | |
| 		}
 | |
| 		rootServiceInfo.Fill()
 | |
| 		rootServiceInfoList = append(rootServiceInfoList, &rootServiceInfo)
 | |
| 	}
 | |
| 	return rootServiceInfoList, nil
 | |
| }
 | |
| 
 | |
| func getObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | |
| 	var response *ApiResponse
 | |
| 	param, err := getCommonParam(c)
 | |
| 	if err != nil {
 | |
| 		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
 | |
| 	}
 | |
| 	rootServiceInfoList, err := getRootServiceInfoList(ctxlog, param.ObCluster, param.ObClusterId)
 | |
| 	if err != nil {
 | |
| 		if rootServiceInfoList != nil && len(rootServiceInfoList) == 0 {
 | |
| 			return NewNotFoundResponse(errors.New(fmt.Sprintf("no obcluster found with query param %v", param)))
 | |
| 		} else {
 | |
| 			return NewErrorResponse(errors.Wrap(err, fmt.Sprintf("get all rootservice info for cluster %s:%d", param.ObCluster, param.ObClusterId)))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if param.Version < 2 || param.ObClusterId > 0 {
 | |
| 		log.WithContext(ctxlog).Infof("return primary ob cluster")
 | |
| 		response = NewSuccessResponse(selectPrimaryCluster(rootServiceInfoList))
 | |
| 	} else {
 | |
| 		log.WithContext(ctxlog).Infof("return all ob clusters")
 | |
| 		response = NewSuccessResponse(rootServiceInfoList)
 | |
| 	}
 | |
| 	return response
 | |
| }
 | |
| 
 | |
| func createOrUpdateObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | |
| 	var response *ApiResponse
 | |
| 	client := GetConfigServer().Client
 | |
| 	obRootServiceInfo := new(model.ObRootServiceInfo)
 | |
| 	err := c.ShouldBindJSON(obRootServiceInfo)
 | |
| 	if err != nil {
 | |
| 		return NewErrorResponse(errors.Wrap(err, "bind rootservice query parameter"))
 | |
| 	}
 | |
| 	obRootServiceInfo.Fill()
 | |
| 	param, err := getCommonParam(c)
 | |
| 	if err != nil {
 | |
| 		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
 | |
| 	}
 | |
| 	if len(obRootServiceInfo.ObCluster) == 0 {
 | |
| 		return NewIllegalArgumentResponse(errors.New("ob cluster name is required"))
 | |
| 	}
 | |
| 	if param.Version > 1 {
 | |
| 		if len(obRootServiceInfo.Type) == 0 {
 | |
| 			return NewIllegalArgumentResponse(errors.New("ob cluster type is required when version > 1"))
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	rsBytes, err := json.Marshal(obRootServiceInfo)
 | |
| 	if err != nil {
 | |
| 		response = NewErrorResponse(errors.Wrap(err, "serialize ob rootservice info"))
 | |
| 	} else {
 | |
| 		rootServiceInfoJson := string(rsBytes)
 | |
| 		log.WithContext(ctxlog).Infof("store rootservice info %s", rootServiceInfoJson)
 | |
| 
 | |
| 		err := client.ObCluster.
 | |
| 			Create().
 | |
| 			SetName(obRootServiceInfo.ObCluster).
 | |
| 			SetObClusterID(obRootServiceInfo.ObClusterId).
 | |
| 			SetType(obRootServiceInfo.Type).
 | |
| 			SetRootserviceJSON(rootServiceInfoJson).
 | |
| 			OnConflict().
 | |
| 			SetRootserviceJSON(rootServiceInfoJson).
 | |
| 			Exec(context.Background())
 | |
| 		if err != nil {
 | |
| 			response = NewErrorResponse(errors.Wrap(err, "save ob rootservice info"))
 | |
| 		} else {
 | |
| 			response = NewSuccessResponse("successful")
 | |
| 		}
 | |
| 	}
 | |
| 	return response
 | |
| }
 | |
| 
 | |
| func deleteObRootServiceInfo(ctxlog context.Context, c *gin.Context) *ApiResponse {
 | |
| 	var response *ApiResponse
 | |
| 	client := GetConfigServer().Client
 | |
| 
 | |
| 	param, err := getCommonParam(c)
 | |
| 	if err != nil {
 | |
| 		return NewErrorResponse(errors.Wrap(err, "parse rootservice query parameter"))
 | |
| 	}
 | |
| 	if param.Version < 2 {
 | |
| 		response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported when version >= 2"))
 | |
| 	} else if param.ObClusterId == 0 {
 | |
| 		response = NewIllegalArgumentResponse(errors.New("delete obcluster rs info is only supported with obcluster id"))
 | |
| 	} else {
 | |
| 		affected, err := client.ObCluster.
 | |
| 			Delete().
 | |
| 			Where(obcluster.Name(param.ObCluster), obcluster.ObClusterID(param.ObClusterId)).
 | |
| 			Exec(context.Background())
 | |
| 		if err != nil {
 | |
| 			response = NewErrorResponse(errors.Wrap(err, fmt.Sprintf("delete obcluster %s with ob cluster id %d in db", param.ObCluster, param.ObClusterId)))
 | |
| 		} else {
 | |
| 			log.WithContext(ctxlog).Infof("delete obcluster %s with ob cluster id %d in db, affected rows %d", param.ObCluster, param.ObClusterId, affected)
 | |
| 			response = NewSuccessResponse("success")
 | |
| 		}
 | |
| 	}
 | |
| 	return response
 | |
| 
 | |
| }
 | 
