2207 lines
66 KiB
Go
2207 lines
66 KiB
Go
// Copyright 2018 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tikvhandler
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math"
|
|
"net/http"
|
|
"net/url"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/gorilla/mux"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/ddl"
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/domain/infosync"
|
|
"github.com/pingcap/tidb/pkg/domain/serverinfo"
|
|
"github.com/pingcap/tidb/pkg/executor"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
infoschemacontext "github.com/pingcap/tidb/pkg/infoschema/context"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend/local"
|
|
"github.com/pingcap/tidb/pkg/meta"
|
|
"github.com/pingcap/tidb/pkg/meta/metadef"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
"github.com/pingcap/tidb/pkg/server/handler"
|
|
"github.com/pingcap/tidb/pkg/session"
|
|
"github.com/pingcap/tidb/pkg/session/sessionapi"
|
|
"github.com/pingcap/tidb/pkg/session/txninfo"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/store/gcworker"
|
|
"github.com/pingcap/tidb/pkg/store/helper"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
"github.com/pingcap/tidb/pkg/tablecodec"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/pingcap/tidb/pkg/util/deadlockhistory"
|
|
"github.com/pingcap/tidb/pkg/util/gcutil"
|
|
"github.com/pingcap/tidb/pkg/util/hack"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/sqlexec"
|
|
"github.com/tikv/client-go/v2/tikv"
|
|
"github.com/tikv/pd/client/clients/router"
|
|
pd "github.com/tikv/pd/client/http"
|
|
"github.com/tikv/pd/client/opt"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const requestDefaultTimeout = 10 * time.Second
|
|
|
|
// SettingsHandler is the handler for list tidb server settings.
|
|
type SettingsHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewSettingsHandler creates a new SettingsHandler.
|
|
func NewSettingsHandler(tool *handler.TikvHandlerTool) *SettingsHandler {
|
|
return &SettingsHandler{tool}
|
|
}
|
|
|
|
// SchemaHandler is the handler for list database or table schemas.
|
|
type SchemaHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewSchemaHandler creates a new SchemaHandler.
|
|
func NewSchemaHandler(tool *handler.TikvHandlerTool) *SchemaHandler {
|
|
return &SchemaHandler{tool}
|
|
}
|
|
|
|
// SchemaStorageHandler is the handler for list database or table schemas.
|
|
type SchemaStorageHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewSchemaStorageHandler creates a new SchemaStorageHandler.
|
|
func NewSchemaStorageHandler(tool *handler.TikvHandlerTool) *SchemaStorageHandler {
|
|
return &SchemaStorageHandler{tool}
|
|
}
|
|
|
|
// DBTableHandler is the handler for list table's regions.
|
|
type DBTableHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewDBTableHandler creates a new DBTableHandler.
|
|
func NewDBTableHandler(tool *handler.TikvHandlerTool) *DBTableHandler {
|
|
return &DBTableHandler{tool}
|
|
}
|
|
|
|
// FlashReplicaHandler is the handler for flash replica.
|
|
type FlashReplicaHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewFlashReplicaHandler creates a new FlashReplicaHandler.
|
|
func NewFlashReplicaHandler(tool *handler.TikvHandlerTool) *FlashReplicaHandler {
|
|
return &FlashReplicaHandler{tool}
|
|
}
|
|
|
|
// RegionHandler is the common field for http handler. It contains
|
|
// some common functions for all handlers.
|
|
type RegionHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewRegionHandler creates a new RegionHandler.
|
|
func NewRegionHandler(tool *handler.TikvHandlerTool) *RegionHandler {
|
|
return &RegionHandler{tool}
|
|
}
|
|
|
|
// TableHandler is the handler for list table's regions.
|
|
type TableHandler struct {
|
|
*handler.TikvHandlerTool
|
|
op string
|
|
}
|
|
|
|
// NewTableHandler creates a new TableHandler.
|
|
func NewTableHandler(tool *handler.TikvHandlerTool, op string) *TableHandler {
|
|
return &TableHandler{tool, op}
|
|
}
|
|
|
|
// DDLHistoryJobHandler is the handler for list job history.
|
|
type DDLHistoryJobHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewDDLHistoryJobHandler creates a new DDLHistoryJobHandler.
|
|
func NewDDLHistoryJobHandler(tool *handler.TikvHandlerTool) *DDLHistoryJobHandler {
|
|
return &DDLHistoryJobHandler{tool}
|
|
}
|
|
|
|
// DDLResignOwnerHandler is the handler for resigning ddl owner.
|
|
type DDLResignOwnerHandler struct {
|
|
store kv.Storage
|
|
}
|
|
|
|
// NewDDLResignOwnerHandler creates a new DDLResignOwnerHandler.
|
|
func NewDDLResignOwnerHandler(store kv.Storage) *DDLResignOwnerHandler {
|
|
return &DDLResignOwnerHandler{store}
|
|
}
|
|
|
|
// ServerInfoHandler is the handler for getting statistics.
|
|
type ServerInfoHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewServerInfoHandler creates a new ServerInfoHandler.
|
|
func NewServerInfoHandler(tool *handler.TikvHandlerTool) *ServerInfoHandler {
|
|
return &ServerInfoHandler{tool}
|
|
}
|
|
|
|
// AllServerInfoHandler is the handler for getting all servers information.
|
|
type AllServerInfoHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewAllServerInfoHandler creates a new AllServerInfoHandler.
|
|
func NewAllServerInfoHandler(tool *handler.TikvHandlerTool) *AllServerInfoHandler {
|
|
return &AllServerInfoHandler{tool}
|
|
}
|
|
|
|
// ProfileHandler is the handler for getting profile.
|
|
type ProfileHandler struct {
|
|
*handler.TikvHandlerTool
|
|
}
|
|
|
|
// NewProfileHandler creates a new ProfileHandler.
|
|
func NewProfileHandler(tool *handler.TikvHandlerTool) *ProfileHandler {
|
|
return &ProfileHandler{tool}
|
|
}
|
|
|
|
// DDLHookHandler is the handler for use pre-defined ddl callback.
|
|
// It's convenient to provide some APIs for integration tests.
|
|
type DDLHookHandler struct{}
|
|
|
|
// ValueHandler is the handler for get value.
|
|
type ValueHandler struct {
|
|
}
|
|
|
|
// LabelHandler is the handler for set labels
|
|
type LabelHandler struct{}
|
|
|
|
const (
|
|
// OpTableRegions is the operation for getting regions of a table.
|
|
OpTableRegions = "regions"
|
|
// OpTableRanges is the operation for getting ranges of a table.
|
|
OpTableRanges = "ranges"
|
|
// OpTableDiskUsage is the operation for getting disk usage of a table.
|
|
OpTableDiskUsage = "disk-usage"
|
|
// OpTableScatter is the operation for scattering a table.
|
|
OpTableScatter = "scatter-table"
|
|
// OpStopTableScatter is the operation for stopping scattering a table.
|
|
OpStopTableScatter = "stop-scatter-table"
|
|
)
|
|
|
|
// MvccTxnHandler is the handler for txn debugger.
|
|
type MvccTxnHandler struct {
|
|
*handler.TikvHandlerTool
|
|
op string
|
|
}
|
|
|
|
// NewMvccTxnHandler creates a new MvccTxnHandler.
|
|
func NewMvccTxnHandler(tool *handler.TikvHandlerTool, op string) *MvccTxnHandler {
|
|
return &MvccTxnHandler{tool, op}
|
|
}
|
|
|
|
const (
|
|
// OpMvccGetByHex is the operation for getting mvcc value by hex format.
|
|
OpMvccGetByHex = "hex"
|
|
// OpMvccGetByKey is the operation for getting mvcc value by key.
|
|
OpMvccGetByKey = "key"
|
|
// OpMvccGetByIdx is the operation for getting mvcc value by idx.
|
|
OpMvccGetByIdx = "idx"
|
|
// OpMvccGetByTxn is the operation for getting mvcc value by txn.
|
|
OpMvccGetByTxn = "txn"
|
|
)
|
|
|
|
// ServeHTTP handles request of list a database or table's schemas.
|
|
func (ValueHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
// parse params
|
|
params := mux.Vars(req)
|
|
|
|
colID, err := strconv.ParseInt(params[handler.ColumnID], 0, 64)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
colTp, err := strconv.ParseInt(params[handler.ColumnTp], 0, 64)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
colFlag, err := strconv.ParseUint(params[handler.ColumnFlag], 0, 64)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
colLen, err := strconv.ParseInt(params[handler.ColumnLen], 0, 64)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
// Get the unchanged binary.
|
|
if req.URL == nil {
|
|
err = errors.BadRequestf("Invalid URL")
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
values := make(url.Values)
|
|
|
|
err = parseQuery(req.URL.RawQuery, values, false)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
if len(values[handler.RowBin]) != 1 {
|
|
err = errors.BadRequestf("Invalid Query:%v", values[handler.RowBin])
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
bin := values[handler.RowBin][0]
|
|
valData, err := base64.StdEncoding.DecodeString(bin)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
// Construct field type.
|
|
defaultDecimal := 6
|
|
ft := types.NewFieldTypeBuilder().SetType(byte(colTp)).SetFlag(uint(colFlag)).SetFlen(int(colLen)).SetDecimal(defaultDecimal).BuildP()
|
|
// Decode a column.
|
|
m := make(map[int64]*types.FieldType, 1)
|
|
m[colID] = ft
|
|
loc := time.UTC
|
|
vals, err := tablecodec.DecodeRowToDatumMap(valData, m, loc)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
v := vals[colID]
|
|
val, err := v.ToString()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, val)
|
|
}
|
|
|
|
// TableRegions is the response data for list table's regions.
|
|
// It contains regions list for record and indices.
|
|
type TableRegions struct {
|
|
TableName string `json:"name"`
|
|
TableID int64 `json:"id"`
|
|
RecordRegions []handler.RegionMeta `json:"record_regions"`
|
|
Indices []IndexRegions `json:"indices"`
|
|
}
|
|
|
|
// RangeDetail contains detail information about a particular range
|
|
type RangeDetail struct {
|
|
StartKey []byte `json:"start_key"`
|
|
EndKey []byte `json:"end_key"`
|
|
StartKeyHex string `json:"start_key_hex"`
|
|
EndKeyHex string `json:"end_key_hex"`
|
|
}
|
|
|
|
func createRangeDetail(start, end []byte) RangeDetail {
|
|
return RangeDetail{
|
|
StartKey: start,
|
|
EndKey: end,
|
|
StartKeyHex: hex.EncodeToString(start),
|
|
EndKeyHex: hex.EncodeToString(end),
|
|
}
|
|
}
|
|
|
|
// TableRanges is the response data for list table's ranges.
|
|
// It contains ranges list for record and indices as well as the whole table.
|
|
type TableRanges struct {
|
|
TableName string `json:"name"`
|
|
TableID int64 `json:"id"`
|
|
Range RangeDetail `json:"table"`
|
|
Record RangeDetail `json:"record"`
|
|
Index RangeDetail `json:"index"`
|
|
Indices map[string]RangeDetail `json:"indices,omitempty"`
|
|
}
|
|
|
|
// IndexRegions is the region info for one index.
|
|
type IndexRegions struct {
|
|
Name string `json:"name"`
|
|
ID int64 `json:"id"`
|
|
Regions []handler.RegionMeta `json:"regions"`
|
|
}
|
|
|
|
// RegionDetail is the response data for get region by ID
|
|
// it includes indices and records detail in current region.
|
|
type RegionDetail struct {
|
|
RangeDetail `json:",inline"`
|
|
RegionID uint64 `json:"region_id"`
|
|
Frames []*helper.FrameItem `json:"frames"`
|
|
}
|
|
|
|
// addTableInRange insert a table into RegionDetail
|
|
// with index's id or record in the range if r.
|
|
func (rt *RegionDetail) addTableInRange(dbName string, curTable *model.TableInfo, r *helper.RegionFrameRange) {
|
|
tName := curTable.Name.String()
|
|
tID := curTable.ID
|
|
pi := curTable.GetPartitionInfo()
|
|
isCommonHandle := curTable.IsCommonHandle
|
|
for _, index := range curTable.Indices {
|
|
if index.Primary && isCommonHandle {
|
|
continue
|
|
}
|
|
if pi != nil {
|
|
for _, def := range pi.Definitions {
|
|
if f := r.GetIndexFrame(def.ID, index.ID, dbName, fmt.Sprintf("%s(%s)", tName, def.Name.O), index.Name.String()); f != nil {
|
|
rt.Frames = append(rt.Frames, f)
|
|
}
|
|
}
|
|
} else if f := r.GetIndexFrame(tID, index.ID, dbName, tName, index.Name.String()); f != nil {
|
|
rt.Frames = append(rt.Frames, f)
|
|
}
|
|
}
|
|
|
|
if pi != nil {
|
|
for _, def := range pi.Definitions {
|
|
if f := r.GetRecordFrame(def.ID, dbName, fmt.Sprintf("%s(%s)", tName, def.Name.O), isCommonHandle); f != nil {
|
|
rt.Frames = append(rt.Frames, f)
|
|
}
|
|
}
|
|
} else if f := r.GetRecordFrame(tID, dbName, tName, isCommonHandle); f != nil {
|
|
rt.Frames = append(rt.Frames, f)
|
|
}
|
|
}
|
|
|
|
// FrameItem includes a index's or record's meta data with table's info.
|
|
type FrameItem struct {
|
|
DBName string `json:"db_name"`
|
|
TableName string `json:"table_name"`
|
|
TableID int64 `json:"table_id"`
|
|
IsRecord bool `json:"is_record"`
|
|
RecordID int64 `json:"record_id,omitempty"`
|
|
IndexName string `json:"index_name,omitempty"`
|
|
IndexID int64 `json:"index_id,omitempty"`
|
|
IndexValues []string `json:"index_values,omitempty"`
|
|
}
|
|
|
|
// ServeHTTP handles request of list tidb server settings.
|
|
func (h SettingsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method == "POST" {
|
|
err := req.ParseForm()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
if levelStr := req.Form.Get("log_level"); levelStr != "" {
|
|
err1 := logutil.SetLevel(levelStr)
|
|
if err1 != nil {
|
|
handler.WriteError(w, err1)
|
|
return
|
|
}
|
|
|
|
config.GetGlobalConfig().Log.Level = levelStr
|
|
}
|
|
if generalLog := req.Form.Get("tidb_general_log"); generalLog != "" {
|
|
switch generalLog {
|
|
case "0":
|
|
vardef.ProcessGeneralLog.Store(false)
|
|
case "1":
|
|
vardef.ProcessGeneralLog.Store(true)
|
|
default:
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
}
|
|
}
|
|
if asyncCommit := req.Form.Get("tidb_enable_async_commit"); asyncCommit != "" {
|
|
s, err := session.CreateSession(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
defer s.Close()
|
|
|
|
switch asyncCommit {
|
|
case "0":
|
|
err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), vardef.TiDBEnableAsyncCommit, vardef.Off)
|
|
case "1":
|
|
err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), vardef.TiDBEnableAsyncCommit, vardef.On)
|
|
default:
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
}
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
}
|
|
if onePC := req.Form.Get("tidb_enable_1pc"); onePC != "" {
|
|
s, err := session.CreateSession(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
defer s.Close()
|
|
|
|
switch onePC {
|
|
case "0":
|
|
err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), vardef.TiDBEnable1PC, vardef.Off)
|
|
case "1":
|
|
err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), vardef.TiDBEnable1PC, vardef.On)
|
|
default:
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
}
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
}
|
|
if ddlSlowThreshold := req.Form.Get("ddl_slow_threshold"); ddlSlowThreshold != "" {
|
|
threshold, err1 := strconv.Atoi(ddlSlowThreshold)
|
|
if err1 != nil {
|
|
handler.WriteError(w, err1)
|
|
return
|
|
}
|
|
if threshold > 0 {
|
|
atomic.StoreUint32(&vardef.DDLSlowOprThreshold, uint32(threshold))
|
|
}
|
|
}
|
|
if checkMb4ValueInUtf8 := req.Form.Get("check_mb4_value_in_utf8"); checkMb4ValueInUtf8 != "" {
|
|
switch checkMb4ValueInUtf8 {
|
|
case "0":
|
|
config.GetGlobalConfig().Instance.CheckMb4ValueInUTF8.Store(false)
|
|
case "1":
|
|
config.GetGlobalConfig().Instance.CheckMb4ValueInUTF8.Store(true)
|
|
default:
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
}
|
|
}
|
|
if deadlockHistoryCapacity := req.Form.Get("deadlock_history_capacity"); deadlockHistoryCapacity != "" {
|
|
capacity, err := strconv.Atoi(deadlockHistoryCapacity)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
} else if capacity < 0 || capacity > 10000 {
|
|
handler.WriteError(w, errors.New("deadlock_history_capacity out of range, should be in 0 to 10000"))
|
|
return
|
|
}
|
|
cfg := config.GetGlobalConfig()
|
|
cfg.PessimisticTxn.DeadlockHistoryCapacity = uint(capacity)
|
|
config.StoreGlobalConfig(cfg)
|
|
deadlockhistory.GlobalDeadlockHistory.Resize(uint(capacity))
|
|
}
|
|
if deadlockCollectRetryable := req.Form.Get("deadlock_history_collect_retryable"); deadlockCollectRetryable != "" {
|
|
collectRetryable, err := strconv.ParseBool(deadlockCollectRetryable)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
}
|
|
cfg := config.GetGlobalConfig()
|
|
cfg.PessimisticTxn.DeadlockHistoryCollectRetryable = collectRetryable
|
|
config.StoreGlobalConfig(cfg)
|
|
}
|
|
if mutationChecker := req.Form.Get("tidb_enable_mutation_checker"); mutationChecker != "" {
|
|
s, err := session.CreateSession(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
defer s.Close()
|
|
|
|
switch mutationChecker {
|
|
case "0":
|
|
err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), vardef.TiDBEnableMutationChecker, vardef.Off)
|
|
case "1":
|
|
err = s.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(context.Background(), vardef.TiDBEnableMutationChecker, vardef.On)
|
|
default:
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
}
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
}
|
|
if transactionSummaryCapacity := req.Form.Get("transaction_summary_capacity"); transactionSummaryCapacity != "" {
|
|
capacity, err := strconv.Atoi(transactionSummaryCapacity)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
} else if capacity < 0 || capacity > 5000 {
|
|
handler.WriteError(w, errors.New("transaction_summary_capacity out of range, should be in 0 to 5000"))
|
|
return
|
|
}
|
|
cfg := config.GetGlobalConfig()
|
|
cfg.TrxSummary.TransactionSummaryCapacity = uint(capacity)
|
|
config.StoreGlobalConfig(cfg)
|
|
txninfo.Recorder.ResizeSummaries(uint(capacity))
|
|
}
|
|
if transactionIDDigestMinDuration := req.Form.Get("transaction_id_digest_min_duration"); transactionIDDigestMinDuration != "" {
|
|
duration, err := strconv.Atoi(transactionIDDigestMinDuration)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("illegal argument"))
|
|
return
|
|
} else if duration < 0 || duration > 2147483647 {
|
|
handler.WriteError(w, errors.New("transaction_id_digest_min_duration out of range, should be in 0 to 2147483647"))
|
|
return
|
|
}
|
|
cfg := config.GetGlobalConfig()
|
|
cfg.TrxSummary.TransactionIDDigestMinDuration = uint(duration)
|
|
config.StoreGlobalConfig(cfg)
|
|
txninfo.Recorder.SetMinDuration(time.Duration(duration) * time.Millisecond)
|
|
}
|
|
} else {
|
|
handler.WriteData(w, config.GetGlobalConfig())
|
|
}
|
|
}
|
|
|
|
// TableFlashReplicaInfo is the replica information of a table.
|
|
type TableFlashReplicaInfo struct {
|
|
// Modifying the field name needs to negotiate with TiFlash colleague.
|
|
ID int64 `json:"id"`
|
|
ReplicaCount uint64 `json:"replica_count"`
|
|
LocationLabels []string `json:"location_labels"`
|
|
Available bool `json:"available"`
|
|
HighPriority bool `json:"high_priority"`
|
|
}
|
|
|
|
// ServeHTTP implements the HTTPHandler interface.
|
|
func (h FlashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method == http.MethodPost {
|
|
h.handleStatusReport(w, req)
|
|
return
|
|
}
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
replicaInfos := make([]*TableFlashReplicaInfo, 0)
|
|
schemas := schema.ListTablesWithSpecialAttribute(infoschemacontext.TiFlashAttribute)
|
|
for _, schema := range schemas {
|
|
for _, tbl := range schema.TableInfos {
|
|
replicaInfos = appendTiFlashReplicaInfo(replicaInfos, tbl)
|
|
}
|
|
}
|
|
|
|
droppedOrTruncateReplicaInfos, err := h.getDropOrTruncateTableTiflash(schema)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
replicaInfos = append(replicaInfos, droppedOrTruncateReplicaInfos...)
|
|
handler.WriteData(w, replicaInfos)
|
|
}
|
|
|
|
func appendTiFlashReplicaInfo(replicaInfos []*TableFlashReplicaInfo, tblInfo *model.TableInfo) []*TableFlashReplicaInfo {
|
|
if tblInfo.TiFlashReplica == nil {
|
|
return replicaInfos
|
|
}
|
|
if pi := tblInfo.GetPartitionInfo(); pi != nil {
|
|
for _, p := range pi.Definitions {
|
|
replicaInfos = append(replicaInfos, &TableFlashReplicaInfo{
|
|
ID: p.ID,
|
|
ReplicaCount: tblInfo.TiFlashReplica.Count,
|
|
LocationLabels: tblInfo.TiFlashReplica.LocationLabels,
|
|
Available: tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID),
|
|
})
|
|
}
|
|
for _, p := range pi.AddingDefinitions {
|
|
replicaInfos = append(replicaInfos, &TableFlashReplicaInfo{
|
|
ID: p.ID,
|
|
ReplicaCount: tblInfo.TiFlashReplica.Count,
|
|
LocationLabels: tblInfo.TiFlashReplica.LocationLabels,
|
|
Available: tblInfo.TiFlashReplica.IsPartitionAvailable(p.ID),
|
|
HighPriority: true,
|
|
})
|
|
}
|
|
return replicaInfos
|
|
}
|
|
replicaInfos = append(replicaInfos, &TableFlashReplicaInfo{
|
|
ID: tblInfo.ID,
|
|
ReplicaCount: tblInfo.TiFlashReplica.Count,
|
|
LocationLabels: tblInfo.TiFlashReplica.LocationLabels,
|
|
Available: tblInfo.TiFlashReplica.Available,
|
|
})
|
|
return replicaInfos
|
|
}
|
|
|
|
func (h FlashReplicaHandler) getDropOrTruncateTableTiflash(currentSchema infoschema.InfoSchema) ([]*TableFlashReplicaInfo, error) {
|
|
s, err := session.CreateSession(h.Store)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
defer s.Close()
|
|
|
|
store := domain.GetDomain(s).Store()
|
|
txn, err := store.Begin()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
gcSafePoint, err := gcutil.GetGCSafePoint(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
replicaInfos := make([]*TableFlashReplicaInfo, 0)
|
|
uniqueIDMap := make(map[int64]struct{})
|
|
handleJobAndTableInfo := func(_ *model.Job, tblInfo *model.TableInfo) (bool, error) {
|
|
// Avoid duplicate table ID info.
|
|
if _, ok := currentSchema.TableByID(context.Background(), tblInfo.ID); ok {
|
|
return false, nil
|
|
}
|
|
if _, ok := uniqueIDMap[tblInfo.ID]; ok {
|
|
return false, nil
|
|
}
|
|
uniqueIDMap[tblInfo.ID] = struct{}{}
|
|
replicaInfos = appendTiFlashReplicaInfo(replicaInfos, tblInfo)
|
|
return false, nil
|
|
}
|
|
dom := domain.GetDomain(s)
|
|
fn := func(jobs []*model.Job) (bool, error) {
|
|
return executor.GetDropOrTruncateTableInfoFromJobs(jobs, gcSafePoint, dom, handleJobAndTableInfo)
|
|
}
|
|
err = ddl.IterAllDDLJobs(s, txn, fn)
|
|
if err != nil {
|
|
if terror.ErrorEqual(variable.ErrSnapshotTooOld, err) {
|
|
// The err indicate that current ddl job and remain DDL jobs was been deleted by GC,
|
|
// just ignore the error and return directly.
|
|
return replicaInfos, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
return replicaInfos, nil
|
|
}
|
|
|
|
type tableFlashReplicaStatus struct {
|
|
// Modifying the field name needs to negotiate with TiFlash colleague.
|
|
ID int64 `json:"id"`
|
|
// RegionCount is the number of regions that need sync.
|
|
RegionCount uint64 `json:"region_count"`
|
|
// FlashRegionCount is the number of regions that already sync completed.
|
|
FlashRegionCount uint64 `json:"flash_region_count"`
|
|
}
|
|
|
|
// checkTableFlashReplicaAvailable uses to check the available status of table flash replica.
|
|
func (tf *tableFlashReplicaStatus) checkTableFlashReplicaAvailable() bool {
|
|
return tf.FlashRegionCount == tf.RegionCount
|
|
}
|
|
|
|
func (h FlashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http.Request) {
|
|
var status tableFlashReplicaStatus
|
|
err := json.NewDecoder(req.Body).Decode(&status)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
do, err := session.GetDomain(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
s, err := session.CreateSession(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
defer s.Close()
|
|
|
|
available := status.checkTableFlashReplicaAvailable()
|
|
err = do.DDLExecutor().UpdateTableReplicaInfo(s, status.ID, available)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
}
|
|
if available {
|
|
var tableInfo model.TableInfo
|
|
tableInfo.ID = status.ID
|
|
err = infosync.DeleteTiFlashTableSyncProgress(&tableInfo)
|
|
} else {
|
|
progress := float64(status.FlashRegionCount) / float64(status.RegionCount)
|
|
err = infosync.UpdateTiFlashProgressCache(status.ID, progress)
|
|
}
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
}
|
|
|
|
logutil.BgLogger().Info("handle flash replica report", zap.Int64("table ID", status.ID), zap.Uint64("region count",
|
|
status.RegionCount),
|
|
zap.Uint64("flash region count", status.FlashRegionCount),
|
|
zap.Error(err))
|
|
}
|
|
|
|
// SchemaTableStorage is the schema table storage info.
|
|
type SchemaTableStorage struct {
|
|
TableSchema string `json:"table_schema"`
|
|
TableName string `json:"table_name"`
|
|
TableRows int64 `json:"table_rows"`
|
|
AvgRowLength int64 `json:"avg_row_length"`
|
|
DataLength int64 `json:"data_length"`
|
|
MaxDataLength int64 `json:"max_data_length"`
|
|
IndexLength int64 `json:"index_length"`
|
|
DataFree int64 `json:"data_free"`
|
|
}
|
|
|
|
func getSchemaTablesStorageInfo(h *SchemaStorageHandler, schema *ast.CIStr, table *ast.CIStr) (messages []*SchemaTableStorage, err error) {
|
|
var s sessionapi.Session
|
|
if s, err = session.CreateSession(h.Store); err != nil {
|
|
return
|
|
}
|
|
defer s.Close()
|
|
|
|
sctx := s.(sessionctx.Context)
|
|
condition := make([]string, 0)
|
|
params := make([]any, 0)
|
|
|
|
if schema != nil {
|
|
condition = append(condition, `TABLE_SCHEMA = %?`)
|
|
params = append(params, schema.O)
|
|
}
|
|
if table != nil {
|
|
condition = append(condition, `TABLE_NAME = %?`)
|
|
params = append(params, table.O)
|
|
}
|
|
|
|
sql := `select TABLE_SCHEMA,TABLE_NAME,TABLE_ROWS,AVG_ROW_LENGTH,DATA_LENGTH,MAX_DATA_LENGTH,INDEX_LENGTH,DATA_FREE from INFORMATION_SCHEMA.TABLES`
|
|
if len(condition) > 0 {
|
|
//nolint: gosec
|
|
sql += ` WHERE ` + strings.Join(condition, ` AND `)
|
|
}
|
|
var results sqlexec.RecordSet
|
|
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)
|
|
if results, err = sctx.GetSQLExecutor().ExecuteInternal(ctx, sql, params...); err != nil {
|
|
logutil.BgLogger().Error(`ExecuteInternal`, zap.Error(err))
|
|
} else if results != nil {
|
|
messages = make([]*SchemaTableStorage, 0)
|
|
defer terror.Call(results.Close)
|
|
for {
|
|
req := results.NewChunk(nil)
|
|
if err = results.Next(ctx, req); err != nil {
|
|
break
|
|
}
|
|
|
|
if req.NumRows() == 0 {
|
|
break
|
|
}
|
|
|
|
for i := range req.NumRows() {
|
|
messages = append(messages, &SchemaTableStorage{
|
|
TableSchema: req.GetRow(i).GetString(0),
|
|
TableName: req.GetRow(i).GetString(1),
|
|
TableRows: req.GetRow(i).GetInt64(2),
|
|
AvgRowLength: req.GetRow(i).GetInt64(3),
|
|
DataLength: req.GetRow(i).GetInt64(4),
|
|
MaxDataLength: req.GetRow(i).GetInt64(5),
|
|
IndexLength: req.GetRow(i).GetInt64(6),
|
|
DataFree: req.GetRow(i).GetInt64(7),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// ServeHTTP handles request of list a database or table's schemas.
|
|
func (h SchemaStorageHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
// parse params
|
|
params := mux.Vars(req)
|
|
|
|
var (
|
|
dbName *ast.CIStr
|
|
tableName *ast.CIStr
|
|
isSingle bool
|
|
)
|
|
|
|
if reqDbName, ok := params[handler.DBName]; ok {
|
|
cDBName := ast.NewCIStr(reqDbName)
|
|
// all table schemas in a specified database
|
|
schemaInfo, exists := schema.SchemaByName(cDBName)
|
|
if !exists {
|
|
handler.WriteError(w, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(reqDbName))
|
|
return
|
|
}
|
|
dbName = &schemaInfo.Name
|
|
|
|
if reqTableName, ok := params[handler.TableName]; ok {
|
|
// table schema of a specified table name
|
|
cTableName := ast.NewCIStr(reqTableName)
|
|
data, e := schema.TableByName(context.Background(), cDBName, cTableName)
|
|
if e != nil {
|
|
handler.WriteError(w, e)
|
|
return
|
|
}
|
|
tableName = &data.Meta().Name
|
|
isSingle = true
|
|
}
|
|
}
|
|
|
|
if results, e := getSchemaTablesStorageInfo(&h, dbName, tableName); e != nil {
|
|
handler.WriteError(w, e)
|
|
} else {
|
|
if isSingle {
|
|
handler.WriteData(w, results[0])
|
|
} else {
|
|
handler.WriteData(w, results)
|
|
}
|
|
}
|
|
}
|
|
|
|
// WriteDBTablesData writes all the table data in a database. The format is the
|
|
// marshal result of []*model.TableInfo, you can unmarshal it to
|
|
// []*model.TableInfo.
|
|
//
|
|
// Note: It would return StatusOK even if errors occur. But if errors occur,
|
|
// there must be some bugs.
|
|
func WriteDBTablesData(w http.ResponseWriter, tbs []*model.TableInfo) {
|
|
a := make([]any, 0, len(tbs))
|
|
for _, tb := range tbs {
|
|
a = append(a, tb)
|
|
}
|
|
manualWriteJSONArray(w, a)
|
|
}
|
|
|
|
// manualWriteJSONArray manually construct the marshal result so that the memory
|
|
// can be deallocated quickly. For every item in the input, we marshal them. The
|
|
// result such as {tb1} {tb2} {tb3}. Then we add some bytes to make it become
|
|
// [{tb1}, {tb2}, {tb3}] to build a valid JSON array.
|
|
func manualWriteJSONArray(w http.ResponseWriter, array []any) {
|
|
if len(array) == 0 {
|
|
handler.WriteData(w, []*model.TableInfo{})
|
|
return
|
|
}
|
|
w.Header().Set(handler.HeaderContentType, handler.ContentTypeJSON)
|
|
// We assume that marshal is always OK.
|
|
w.WriteHeader(http.StatusOK)
|
|
_, err := w.Write(hack.Slice("[\n"))
|
|
if err != nil {
|
|
terror.Log(errors.Trace(err))
|
|
return
|
|
}
|
|
init := false
|
|
for _, item := range array {
|
|
if init {
|
|
_, err = w.Write(hack.Slice(",\n"))
|
|
if err != nil {
|
|
terror.Log(errors.Trace(err))
|
|
return
|
|
}
|
|
} else {
|
|
init = true
|
|
}
|
|
js, err := json.MarshalIndent(item, "", " ")
|
|
if err != nil {
|
|
terror.Log(errors.Trace(err))
|
|
return
|
|
}
|
|
_, err = w.Write(js)
|
|
if err != nil {
|
|
terror.Log(errors.Trace(err))
|
|
return
|
|
}
|
|
}
|
|
_, err = w.Write(hack.Slice("\n]"))
|
|
terror.Log(errors.Trace(err))
|
|
}
|
|
|
|
func writeDBSimpleTablesData(w http.ResponseWriter, tbs []*model.TableNameInfo) {
|
|
a := make([]any, 0, len(tbs))
|
|
for _, tb := range tbs {
|
|
a = append(a, tb)
|
|
}
|
|
manualWriteJSONArray(w, a)
|
|
}
|
|
|
|
// ServeHTTP handles request of list a database or table's schemas.
|
|
func (h SchemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
// parse params
|
|
params := mux.Vars(req)
|
|
|
|
if dbName, ok := params[handler.DBName]; ok {
|
|
cDBName := ast.NewCIStr(dbName)
|
|
if tableName, ok := params[handler.TableName]; ok {
|
|
// table schema of a specified table name
|
|
cTableName := ast.NewCIStr(tableName)
|
|
data, err := schema.TableByName(context.Background(), cDBName, cTableName)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, data.Meta())
|
|
return
|
|
}
|
|
// all table schemas in a specified database
|
|
if schema.SchemaExists(cDBName) {
|
|
if a := req.FormValue(handler.IDNameOnly); a == "true" {
|
|
tbs, err := schema.SchemaSimpleTableInfos(context.Background(), cDBName)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
writeDBSimpleTablesData(w, tbs)
|
|
return
|
|
}
|
|
tbs, err := schema.SchemaTableInfos(context.Background(), cDBName)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
WriteDBTablesData(w, tbs)
|
|
return
|
|
}
|
|
handler.WriteError(w, infoschema.ErrDatabaseNotExists.GenWithStackByArgs(dbName))
|
|
return
|
|
}
|
|
|
|
if tableID := req.FormValue(handler.TableIDQuery); len(tableID) > 0 {
|
|
// table schema of a specified tableID
|
|
data, err := getTableByIDStr(schema, tableID)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, data)
|
|
return
|
|
}
|
|
|
|
if tableIDsStr := req.FormValue(handler.TableIDsQuery); len(tableIDsStr) > 0 {
|
|
tableIDs := strings.Split(tableIDsStr, ",")
|
|
data := make(map[int64]*model.TableInfo, len(tableIDs))
|
|
for _, tableID := range tableIDs {
|
|
tbl, err := getTableByIDStr(schema, tableID)
|
|
if err == nil {
|
|
data[tbl.ID] = tbl
|
|
}
|
|
}
|
|
if len(data) > 0 {
|
|
handler.WriteData(w, data)
|
|
} else {
|
|
handler.WriteError(w, errors.New("All tables are not found"))
|
|
}
|
|
return
|
|
}
|
|
|
|
// all databases' schemas
|
|
handler.WriteData(w, schema.AllSchemas())
|
|
}
|
|
|
|
func getTableByIDStr(schema infoschema.InfoSchema, tableID string) (*model.TableInfo, error) {
|
|
tid, err := strconv.Atoi(tableID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if tid < 0 {
|
|
return nil, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID)
|
|
}
|
|
if data, ok := schema.TableByID(context.Background(), int64(tid)); ok {
|
|
return data.Meta(), nil
|
|
}
|
|
// The tid maybe a partition ID of the partition-table.
|
|
tbl, _, _ := schema.FindTableByPartitionID(int64(tid))
|
|
if tbl == nil {
|
|
return nil, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID)
|
|
}
|
|
return tbl.Meta(), nil
|
|
}
|
|
|
|
// ServeHTTP handles table related requests, such as table's region information, disk usage.
|
|
func (h *TableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
// parse params
|
|
params := mux.Vars(req)
|
|
dbName := params[handler.DBName]
|
|
tableName := params[handler.TableName]
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
tableName, partitionName := handler.ExtractTableAndPartitionName(tableName)
|
|
tableVal, err := schema.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tableName))
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
switch h.op {
|
|
case OpTableRegions:
|
|
h.handleRegionRequest(tableVal, w)
|
|
case OpTableRanges:
|
|
h.handleRangeRequest(tableVal, w)
|
|
case OpTableDiskUsage:
|
|
h.handleDiskUsageRequest(tableVal, w)
|
|
case OpTableScatter:
|
|
// supports partition table, only get one physical table, prevent too many scatter schedulers.
|
|
ptbl, err := h.GetPartition(tableVal, partitionName)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
h.handleScatterTableRequest(ptbl, w)
|
|
case OpStopTableScatter:
|
|
ptbl, err := h.GetPartition(tableVal, partitionName)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
h.handleStopScatterTableRequest(ptbl, w)
|
|
default:
|
|
handler.WriteError(w, errors.New("method not found"))
|
|
}
|
|
}
|
|
|
|
// ServeHTTP handles request of ddl jobs history.
|
|
func (h DDLHistoryJobHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
var (
|
|
jobID = 0
|
|
limitID = 0
|
|
err error
|
|
)
|
|
if jobValue := req.FormValue(handler.JobID); len(jobValue) > 0 {
|
|
jobID, err = strconv.Atoi(jobValue)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
if jobID < 1 {
|
|
handler.WriteError(w, errors.New("ddl history start_job_id must be greater than 0"))
|
|
return
|
|
}
|
|
}
|
|
if limitValue := req.FormValue(handler.Limit); len(limitValue) > 0 {
|
|
limitID, err = strconv.Atoi(limitValue)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
if limitID < 1 || limitID > ddl.DefNumGetDDLHistoryJobs {
|
|
handler.WriteError(w,
|
|
errors.Errorf("ddl history limit must be greater than 0 and less than or equal to %v", ddl.DefNumGetDDLHistoryJobs))
|
|
return
|
|
}
|
|
}
|
|
|
|
jobs, err := h.getHistoryDDL(jobID, limitID)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, jobs)
|
|
}
|
|
|
|
func (h DDLHistoryJobHandler) getHistoryDDL(jobID, limit int) (jobs []*model.Job, err error) {
|
|
txn, err := h.Store.Begin()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
txnMeta := meta.NewMutator(txn)
|
|
|
|
jobs, err = ddl.ScanHistoryDDLJobs(txnMeta, int64(jobID), limit)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return jobs, nil
|
|
}
|
|
|
|
func (h DDLResignOwnerHandler) resignDDLOwner() error {
|
|
dom, err := session.GetDomain(h.store)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
ownerMgr := dom.DDL().OwnerManager()
|
|
err = ownerMgr.ResignOwner(context.Background())
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ServeHTTP handles request of resigning ddl owner.
|
|
func (h DDLResignOwnerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != http.MethodPost {
|
|
handler.WriteError(w, errors.Errorf("This api only support POST method"))
|
|
return
|
|
}
|
|
|
|
err := h.resignDDLOwner()
|
|
if err != nil {
|
|
log.Error("failed to resign DDL owner", zap.Error(err))
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
handler.WriteData(w, "success!")
|
|
}
|
|
|
|
func (h *TableHandler) getPDAddr() ([]string, error) {
|
|
etcd, ok := h.Store.(kv.EtcdBackend)
|
|
if !ok {
|
|
return nil, errors.New("not implemented")
|
|
}
|
|
pdAddrs, err := etcd.EtcdAddrs()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(pdAddrs) == 0 {
|
|
return nil, errors.New("pd unavailable")
|
|
}
|
|
return pdAddrs, nil
|
|
}
|
|
|
|
func (h *TableHandler) addScatterSchedule(startKey, endKey []byte, name string) error {
|
|
pdAddrs, err := h.getPDAddr()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
input := map[string]string{
|
|
"name": "scatter-range",
|
|
"start_key": url.QueryEscape(string(startKey)),
|
|
"end_key": url.QueryEscape(string(endKey)),
|
|
"range_name": name,
|
|
}
|
|
v, err := json.Marshal(input)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
scheduleURL := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), pdAddrs[0], pd.Schedulers)
|
|
resp, err := util.InternalHTTPClient().Post(scheduleURL, "application/json", bytes.NewBuffer(v))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := resp.Body.Close(); err != nil {
|
|
log.Error("failed to close response body", zap.Error(err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *TableHandler) deleteScatterSchedule(name string) error {
|
|
pdAddrs, err := h.getPDAddr()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
scheduleURL := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), pdAddrs[0], pd.ScatterRangeSchedulerWithName(name))
|
|
req, err := http.NewRequest(http.MethodDelete, scheduleURL, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp, err := util.InternalHTTPClient().Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := resp.Body.Close(); err != nil {
|
|
log.Error("failed to close response body", zap.Error(err))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *TableHandler) handleScatterTableRequest(tbl table.PhysicalTable, w http.ResponseWriter) {
|
|
// for record
|
|
tableID := tbl.GetPhysicalID()
|
|
startKey, endKey := tablecodec.GetTableHandleKeyRange(tableID)
|
|
startKey = codec.EncodeBytes([]byte{}, startKey)
|
|
endKey = codec.EncodeBytes([]byte{}, endKey)
|
|
tableName := fmt.Sprintf("%s-%d", tbl.Meta().Name.String(), tableID)
|
|
err := h.addScatterSchedule(startKey, endKey, tableName)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Annotate(err, "scatter record error"))
|
|
return
|
|
}
|
|
// for indices
|
|
for _, index := range tbl.Indices() {
|
|
indexID := index.Meta().ID
|
|
indexName := index.Meta().Name.String()
|
|
startKey, endKey := tablecodec.GetTableIndexKeyRange(tableID, indexID)
|
|
startKey = codec.EncodeBytes([]byte{}, startKey)
|
|
endKey = codec.EncodeBytes([]byte{}, endKey)
|
|
name := tableName + "-" + indexName
|
|
err := h.addScatterSchedule(startKey, endKey, name)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Annotatef(err, "scatter index(%s) error", name))
|
|
return
|
|
}
|
|
}
|
|
handler.WriteData(w, "success!")
|
|
}
|
|
|
|
func (h *TableHandler) handleStopScatterTableRequest(tbl table.PhysicalTable, w http.ResponseWriter) {
|
|
// for record
|
|
tableName := fmt.Sprintf("%s-%d", tbl.Meta().Name.String(), tbl.GetPhysicalID())
|
|
err := h.deleteScatterSchedule(tableName)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Annotate(err, "stop scatter record error"))
|
|
return
|
|
}
|
|
// for indices
|
|
for _, index := range tbl.Indices() {
|
|
indexName := index.Meta().Name.String()
|
|
name := tableName + "-" + indexName
|
|
err := h.deleteScatterSchedule(name)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Annotatef(err, "delete scatter index(%s) error", name))
|
|
return
|
|
}
|
|
}
|
|
handler.WriteData(w, "success!")
|
|
}
|
|
|
|
func (h *TableHandler) handleRegionRequest(tbl table.Table, w http.ResponseWriter) {
|
|
pi := tbl.Meta().GetPartitionInfo()
|
|
if pi != nil {
|
|
// Partitioned table.
|
|
var data []*TableRegions
|
|
for _, def := range pi.Definitions {
|
|
tableRegions, err := h.getRegionsByID(tbl, def.ID, def.Name.O)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
data = append(data, tableRegions)
|
|
}
|
|
handler.WriteData(w, data)
|
|
return
|
|
}
|
|
|
|
meta := tbl.Meta()
|
|
tableRegions, err := h.getRegionsByID(tbl, meta.ID, meta.Name.O)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
handler.WriteData(w, tableRegions)
|
|
}
|
|
|
|
func createTableRanges(tblID int64, tblName string, indices []*model.IndexInfo) *TableRanges {
|
|
indexPrefix := tablecodec.GenTableIndexPrefix(tblID)
|
|
recordPrefix := tablecodec.GenTableRecordPrefix(tblID)
|
|
tableEnd := tablecodec.EncodeTablePrefix(tblID + 1)
|
|
ranges := &TableRanges{
|
|
TableName: tblName,
|
|
TableID: tblID,
|
|
Range: createRangeDetail(tablecodec.EncodeTablePrefix(tblID), tableEnd),
|
|
Record: createRangeDetail(recordPrefix, tableEnd),
|
|
Index: createRangeDetail(indexPrefix, recordPrefix),
|
|
}
|
|
if len(indices) != 0 {
|
|
indexRanges := make(map[string]RangeDetail)
|
|
for _, index := range indices {
|
|
start := tablecodec.EncodeTableIndexPrefix(tblID, index.ID)
|
|
end := tablecodec.EncodeTableIndexPrefix(tblID, index.ID+1)
|
|
indexRanges[index.Name.String()] = createRangeDetail(start, end)
|
|
}
|
|
ranges.Indices = indexRanges
|
|
}
|
|
return ranges
|
|
}
|
|
|
|
func (*TableHandler) handleRangeRequest(tbl table.Table, w http.ResponseWriter) {
|
|
meta := tbl.Meta()
|
|
pi := meta.GetPartitionInfo()
|
|
if pi != nil {
|
|
// Partitioned table.
|
|
var data []*TableRanges
|
|
for _, def := range pi.Definitions {
|
|
data = append(data, createTableRanges(def.ID, def.Name.String(), meta.Indices))
|
|
}
|
|
handler.WriteData(w, data)
|
|
return
|
|
}
|
|
|
|
handler.WriteData(w, createTableRanges(meta.ID, meta.Name.String(), meta.Indices))
|
|
}
|
|
|
|
func (h *TableHandler) getRegionsByID(tbl table.Table, id int64, name string) (*TableRegions, error) {
|
|
// for record
|
|
startKey, endKey := tablecodec.GetTableHandleKeyRange(id)
|
|
ctx := context.Background()
|
|
pdCli := h.RegionCache.PDClient()
|
|
regions, err := pdCli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1, opt.WithAllowFollowerHandle())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
recordRegions := make([]handler.RegionMeta, 0, len(regions))
|
|
for _, region := range regions {
|
|
meta := handler.RegionMeta{
|
|
ID: region.Meta.Id,
|
|
Leader: region.Leader,
|
|
Peers: region.Meta.Peers,
|
|
RegionEpoch: region.Meta.RegionEpoch,
|
|
}
|
|
recordRegions = append(recordRegions, meta)
|
|
}
|
|
|
|
// for indices
|
|
indices := make([]IndexRegions, len(tbl.Indices()))
|
|
for i, index := range tbl.Indices() {
|
|
indexID := index.Meta().ID
|
|
indices[i].Name = index.Meta().Name.String()
|
|
indices[i].ID = indexID
|
|
startKey, endKey := tablecodec.GetTableIndexKeyRange(id, indexID)
|
|
regions, err := pdCli.BatchScanRegions(ctx, []router.KeyRange{{StartKey: startKey, EndKey: endKey}}, -1, opt.WithAllowFollowerHandle())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
indexRegions := make([]handler.RegionMeta, 0, len(regions))
|
|
for _, region := range regions {
|
|
meta := handler.RegionMeta{
|
|
ID: region.Meta.Id,
|
|
Leader: region.Leader,
|
|
Peers: region.Meta.Peers,
|
|
RegionEpoch: region.Meta.RegionEpoch,
|
|
}
|
|
indexRegions = append(indexRegions, meta)
|
|
}
|
|
indices[i].Regions = indexRegions
|
|
}
|
|
|
|
return &TableRegions{
|
|
TableName: name,
|
|
TableID: id,
|
|
Indices: indices,
|
|
RecordRegions: recordRegions,
|
|
}, nil
|
|
}
|
|
|
|
func (h *TableHandler) handleDiskUsageRequest(tbl table.Table, w http.ResponseWriter) {
|
|
stats, err := h.GetPDRegionStats(context.Background(), tbl.Meta().ID, false)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, stats.StorageSize)
|
|
}
|
|
|
|
// ServeHTTP handles request of get region by ID.
|
|
func (h RegionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
// parse and check params
|
|
params := mux.Vars(req)
|
|
if _, ok := params[handler.RegionID]; !ok {
|
|
router := mux.CurrentRoute(req).GetName()
|
|
if router == "RegionsMeta" {
|
|
startKey := []byte{'m'}
|
|
endKey := []byte{'n'}
|
|
|
|
recordRegionIDs, err := h.RegionCache.ListRegionIDsInKeyRange(tikv.NewBackofferWithVars(context.Background(), 500, nil), startKey, endKey)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
recordRegions, err := h.GetRegionsMeta(recordRegionIDs)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, recordRegions)
|
|
return
|
|
}
|
|
if router == "RegionHot" {
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
ctx := context.Background()
|
|
hotRead, err := h.ScrapeHotInfo(ctx, helper.HotRead, schema, nil)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
hotWrite, err := h.ScrapeHotInfo(ctx, helper.HotWrite, schema, nil)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, map[string]any{
|
|
"write": hotWrite,
|
|
"read": hotRead,
|
|
})
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
regionIDInt, err := strconv.ParseInt(params[handler.RegionID], 0, 64)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
regionID := uint64(regionIDInt)
|
|
|
|
// locate region
|
|
region, err := h.RegionCache.LocateRegionByID(tikv.NewBackofferWithVars(context.Background(), 500, nil), regionID)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
frameRange, err := helper.NewRegionFrameRange(region)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
// create RegionDetail from RegionFrameRange
|
|
regionDetail := &RegionDetail{
|
|
RegionID: regionID,
|
|
RangeDetail: createRangeDetail(region.StartKey, region.EndKey),
|
|
}
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
// Since we need a database's name for each frame, and a table's database name can not
|
|
// get from table's ID directly. Above all, here do dot process like
|
|
// `for id in [frameRange.firstTableID,frameRange.endTableID]`
|
|
// on [frameRange.firstTableID,frameRange.endTableID] is small enough.
|
|
for _, dbName := range schema.AllSchemaNames() {
|
|
if metadef.IsMemDB(dbName.L) {
|
|
continue
|
|
}
|
|
tables, err := schema.SchemaTableInfos(context.Background(), dbName)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
for _, tableVal := range tables {
|
|
regionDetail.addTableInRange(dbName.String(), tableVal, frameRange)
|
|
}
|
|
}
|
|
handler.WriteData(w, regionDetail)
|
|
}
|
|
|
|
// parseQuery is used to parse query string in URL with shouldUnescape, due to golang http package can not distinguish
|
|
// query like "?a=" and "?a". We rewrite it to separate these two queries. e.g.
|
|
// "?a=" which means that a is an empty string "";
|
|
// "?a" which means that a is null.
|
|
// If shouldUnescape is true, we use QueryUnescape to handle keys and values that will be put in m.
|
|
// If shouldUnescape is false, we don't use QueryUnescap to handle.
|
|
func parseQuery(query string, m url.Values, shouldUnescape bool) error {
|
|
var err error
|
|
for query != "" {
|
|
key := query
|
|
if i := strings.IndexAny(key, "&;"); i >= 0 {
|
|
key, query = key[:i], key[i+1:]
|
|
} else {
|
|
query = ""
|
|
}
|
|
if key == "" {
|
|
continue
|
|
}
|
|
if i := strings.Index(key, "="); i >= 0 {
|
|
value := ""
|
|
key, value = key[:i], key[i+1:]
|
|
if shouldUnescape {
|
|
key, err = url.QueryUnescape(key)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
value, err = url.QueryUnescape(value)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
m[key] = append(m[key], value)
|
|
} else {
|
|
if shouldUnescape {
|
|
key, err = url.QueryUnescape(key)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
if _, ok := m[key]; !ok {
|
|
m[key] = nil
|
|
}
|
|
}
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
// ServeHTTP handles request of list a table's regions.
|
|
func (h MvccTxnHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
var data any
|
|
params := mux.Vars(req)
|
|
var err error
|
|
switch h.op {
|
|
case OpMvccGetByHex:
|
|
data, err = h.HandleMvccGetByHex(params)
|
|
case OpMvccGetByIdx, OpMvccGetByKey:
|
|
if req.URL == nil {
|
|
err = errors.BadRequestf("Invalid URL")
|
|
break
|
|
}
|
|
values := make(url.Values)
|
|
err = parseQuery(req.URL.RawQuery, values, true)
|
|
if err == nil {
|
|
if h.op == OpMvccGetByIdx {
|
|
data, err = h.handleMvccGetByIdx(params, values)
|
|
} else {
|
|
data, err = h.handleMvccGetByKey(params, values)
|
|
}
|
|
}
|
|
case OpMvccGetByTxn:
|
|
data, err = h.handleMvccGetByTxn(params)
|
|
default:
|
|
err = errors.NotSupportedf("Operation not supported.")
|
|
}
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
} else {
|
|
handler.WriteData(w, data)
|
|
}
|
|
}
|
|
|
|
// handleMvccGetByIdx gets MVCC info by an index key.
|
|
func (h MvccTxnHandler) handleMvccGetByIdx(params map[string]string, values url.Values) (any, error) {
|
|
dbName := params[handler.DBName]
|
|
tableName := params[handler.TableName]
|
|
|
|
t, err := h.GetTable(dbName, tableName)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
handle, err := h.GetHandle(t, params, values)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
var idxCols []*model.ColumnInfo
|
|
var idx table.Index
|
|
for _, v := range t.Indices() {
|
|
if strings.EqualFold(v.Meta().Name.String(), params[handler.IndexName]) {
|
|
for _, c := range v.Meta().Columns {
|
|
idxCols = append(idxCols, t.Meta().Columns[c.Offset])
|
|
}
|
|
idx = v
|
|
break
|
|
}
|
|
}
|
|
if idx == nil {
|
|
return nil, errors.NotFoundf("Index %s not found!", params[handler.IndexName])
|
|
}
|
|
return h.GetMvccByIdxValue(idx, values, idxCols, handle)
|
|
}
|
|
|
|
func (h MvccTxnHandler) handleMvccGetByKey(params map[string]string, values url.Values) (any, error) {
|
|
dbName := params[handler.DBName]
|
|
tableName := params[handler.TableName]
|
|
tb, err := h.GetTable(dbName, tableName)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
handle, err := h.GetHandle(tb, params, values)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encodedKey := tablecodec.EncodeRecordKey(tb.RecordPrefix(), handle)
|
|
data, err := h.GetMvccByEncodedKey(encodedKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
regionID, err := h.GetRegionIDByKey(encodedKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resp := &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), Value: data, RegionID: regionID}
|
|
if len(values.Get("decode")) == 0 {
|
|
return resp, nil
|
|
}
|
|
colMap := make(map[int64]*types.FieldType, 3)
|
|
for _, col := range tb.Meta().Columns {
|
|
colMap[col.ID] = &(col.FieldType)
|
|
}
|
|
|
|
respValue := resp.Value
|
|
var result any = resp
|
|
if respValue.Info != nil {
|
|
datas := make(map[string]map[string]string)
|
|
for _, w := range respValue.Info.Writes {
|
|
if len(w.ShortValue) > 0 {
|
|
datas[strconv.FormatUint(w.StartTs, 10)], err = h.decodeMvccData(w.ShortValue, colMap, tb.Meta())
|
|
}
|
|
}
|
|
|
|
for _, v := range respValue.Info.Values {
|
|
if len(v.Value) > 0 {
|
|
datas[strconv.FormatUint(v.StartTs, 10)], err = h.decodeMvccData(v.Value, colMap, tb.Meta())
|
|
}
|
|
}
|
|
|
|
if len(datas) > 0 {
|
|
re := map[string]any{
|
|
"key": resp.Key,
|
|
"info": respValue.Info,
|
|
"data": datas,
|
|
}
|
|
if err != nil {
|
|
re["decode_error"] = err.Error()
|
|
}
|
|
result = re
|
|
}
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
func (MvccTxnHandler) decodeMvccData(bs []byte, colMap map[int64]*types.FieldType, tb *model.TableInfo) (map[string]string, error) {
|
|
rs, err := tablecodec.DecodeRowToDatumMap(bs, colMap, time.UTC)
|
|
record := make(map[string]string, len(tb.Columns))
|
|
for _, col := range tb.Columns {
|
|
if c, ok := rs[col.ID]; ok {
|
|
data := "nil"
|
|
if !c.IsNull() {
|
|
data, err = c.ToString()
|
|
}
|
|
record[col.Name.O] = data
|
|
}
|
|
}
|
|
return record, err
|
|
}
|
|
|
|
func (h *MvccTxnHandler) handleMvccGetByTxn(params map[string]string) (any, error) {
|
|
startTS, err := strconv.ParseInt(params[handler.StartTS], 0, 64)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
tableID, err := h.GetTableID(params[handler.DBName], params[handler.TableName])
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
startKey := tablecodec.EncodeTablePrefix(tableID)
|
|
endKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(math.MaxInt64))
|
|
return h.GetMvccByStartTs(uint64(startTS), startKey, endKey)
|
|
}
|
|
|
|
// ServerInfo is used to report the servers info when do http request.
|
|
type ServerInfo struct {
|
|
IsOwner bool `json:"is_owner"`
|
|
MaxProcs int `json:"max_procs"`
|
|
GOGC int `json:"gogc"`
|
|
*serverinfo.ServerInfo
|
|
}
|
|
|
|
// ServeHTTP handles request of ddl server info.
|
|
func (h ServerInfoHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
|
|
do, err := session.GetDomain(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("create session error"))
|
|
log.Error("failed to get session domain", zap.Error(err))
|
|
return
|
|
}
|
|
info := ServerInfo{}
|
|
info.ServerInfo, err = infosync.GetServerInfo()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
log.Error("failed to get server info", zap.Error(err))
|
|
return
|
|
}
|
|
info.IsOwner = do.DDL().OwnerManager().IsOwner()
|
|
info.MaxProcs = runtime.GOMAXPROCS(0)
|
|
info.GOGC = util.GetGOGC()
|
|
handler.WriteData(w, info)
|
|
}
|
|
|
|
// ClusterServerInfo is used to report cluster servers info when do http request.
|
|
type ClusterServerInfo struct {
|
|
ServersNum int `json:"servers_num,omitempty"`
|
|
OwnerID string `json:"owner_id"`
|
|
IsAllServerVersionConsistent bool `json:"is_all_server_version_consistent,omitempty"`
|
|
AllServersDiffVersions []serverinfo.VersionInfo `json:"all_servers_diff_versions,omitempty"`
|
|
AllServersInfo map[string]*serverinfo.ServerInfo `json:"all_servers_info,omitempty"`
|
|
}
|
|
|
|
// ServeHTTP handles request of all ddl servers info.
|
|
func (h AllServerInfoHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
|
|
do, err := session.GetDomain(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("create session error"))
|
|
log.Error("failed to get session domain", zap.Error(err))
|
|
return
|
|
}
|
|
ctx := context.Background()
|
|
allServersInfo, err := infosync.GetAllServerInfo(ctx)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("ddl server information not found"))
|
|
log.Error("failed to get all server info", zap.Error(err))
|
|
return
|
|
}
|
|
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
|
ownerID, err := do.DDL().OwnerManager().GetOwnerID(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
handler.WriteError(w, errors.New("ddl server information not found"))
|
|
log.Error("failed to get owner id", zap.Error(err))
|
|
return
|
|
}
|
|
allVersionsMap := map[serverinfo.VersionInfo]struct{}{}
|
|
allVersions := make([]serverinfo.VersionInfo, 0, len(allServersInfo))
|
|
for _, v := range allServersInfo {
|
|
if _, ok := allVersionsMap[v.VersionInfo]; ok {
|
|
continue
|
|
}
|
|
allVersionsMap[v.VersionInfo] = struct{}{}
|
|
allVersions = append(allVersions, v.VersionInfo)
|
|
}
|
|
clusterInfo := ClusterServerInfo{
|
|
ServersNum: len(allServersInfo),
|
|
OwnerID: ownerID,
|
|
// len(allVersions) = 1 indicates there has only 1 tidb version in cluster, so all server versions are consistent.
|
|
IsAllServerVersionConsistent: len(allVersions) == 1,
|
|
AllServersInfo: allServersInfo,
|
|
}
|
|
// if IsAllServerVersionConsistent is false, return the all tidb servers version.
|
|
if !clusterInfo.IsAllServerVersionConsistent {
|
|
clusterInfo.AllServersDiffVersions = allVersions
|
|
}
|
|
handler.WriteData(w, clusterInfo)
|
|
}
|
|
|
|
// DBTableInfo is used to report the database, table information and the current schema version.
|
|
type DBTableInfo struct {
|
|
DBInfo *model.DBInfo `json:"db_info"`
|
|
TableInfo *model.TableInfo `json:"table_info"`
|
|
SchemaVersion int64 `json:"schema_version"`
|
|
}
|
|
|
|
// ServeHTTP handles request of database information and table information by tableID.
|
|
func (h DBTableHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
params := mux.Vars(req)
|
|
tableID := params[handler.TableID]
|
|
physicalID, err := strconv.Atoi(tableID)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Errorf("Wrong tableID: %v", tableID))
|
|
return
|
|
}
|
|
|
|
schema, err := h.Schema()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
dbTblInfo := DBTableInfo{
|
|
SchemaVersion: schema.SchemaMetaVersion(),
|
|
}
|
|
tbl, ok := schema.TableByID(context.Background(), int64(physicalID))
|
|
if ok {
|
|
dbTblInfo.TableInfo = tbl.Meta()
|
|
dbInfo, ok := infoschema.SchemaByTable(schema, dbTblInfo.TableInfo)
|
|
if !ok {
|
|
logutil.BgLogger().Error("can not find the database of the table", zap.Int64("table id", dbTblInfo.TableInfo.ID), zap.String("table name", dbTblInfo.TableInfo.Name.L))
|
|
handler.WriteError(w, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID))
|
|
return
|
|
}
|
|
dbTblInfo.DBInfo = dbInfo
|
|
handler.WriteData(w, dbTblInfo)
|
|
return
|
|
}
|
|
// The physicalID maybe a partition ID of the partition-table.
|
|
tbl, dbInfo, _ := schema.FindTableByPartitionID(int64(physicalID))
|
|
if tbl == nil {
|
|
handler.WriteError(w, infoschema.ErrTableNotExists.GenWithStack("Table which ID = %s does not exist.", tableID))
|
|
return
|
|
}
|
|
dbTblInfo.TableInfo = tbl.Meta()
|
|
dbTblInfo.DBInfo = dbInfo
|
|
handler.WriteData(w, dbTblInfo)
|
|
}
|
|
|
|
// ServeHTTP handles request of TiDB metric profile.
|
|
func (h ProfileHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
sctx, err := session.CreateSession(h.Store)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
defer sctx.Close()
|
|
|
|
var start, end time.Time
|
|
if req.FormValue("end") != "" {
|
|
end, err = time.ParseInLocation(time.RFC3339, req.FormValue("end"), sctx.GetSessionVars().Location())
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
} else {
|
|
end = time.Now()
|
|
}
|
|
if req.FormValue("start") != "" {
|
|
start, err = time.ParseInLocation(time.RFC3339, req.FormValue("start"), sctx.GetSessionVars().Location())
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
} else {
|
|
start = end.Add(-time.Minute * 10)
|
|
}
|
|
valueTp := req.FormValue("type")
|
|
pb, err := executor.NewProfileBuilder(sctx, start, end, valueTp)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
err = pb.Collect()
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
_, err = w.Write(pb.Build())
|
|
terror.Log(errors.Trace(err))
|
|
}
|
|
|
|
// TestHandler is the handler for tests. It's convenient to provide some APIs for integration tests.
|
|
type TestHandler struct {
|
|
*handler.TikvHandlerTool
|
|
gcIsRunning uint32
|
|
}
|
|
|
|
// NewTestHandler creates a new TestHandler.
|
|
func NewTestHandler(tool *handler.TikvHandlerTool, gcIsRunning uint32) *TestHandler {
|
|
return &TestHandler{
|
|
TikvHandlerTool: tool,
|
|
gcIsRunning: gcIsRunning,
|
|
}
|
|
}
|
|
|
|
// ServeHTTP handles test related requests.
|
|
func (h *TestHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
params := mux.Vars(req)
|
|
mod := strings.ToLower(params["mod"])
|
|
op := strings.ToLower(params["op"])
|
|
|
|
switch mod {
|
|
case "gc":
|
|
h.handleGC(op, w, req)
|
|
default:
|
|
handler.WriteError(w, errors.NotSupportedf("module(%s)", mod))
|
|
}
|
|
}
|
|
|
|
// Supported operations:
|
|
// - resolvelock?safepoint={uint64}&physical={bool}:
|
|
// - safepoint: resolve all locks whose timestamp is less than the safepoint.
|
|
// - physical: whether it uses physical(green GC) mode to scan locks. Default is true.
|
|
func (h *TestHandler) handleGC(op string, w http.ResponseWriter, req *http.Request) {
|
|
if !atomic.CompareAndSwapUint32(&h.gcIsRunning, 0, 1) {
|
|
handler.WriteError(w, errors.New("GC is running"))
|
|
return
|
|
}
|
|
defer atomic.StoreUint32(&h.gcIsRunning, 0)
|
|
|
|
switch op {
|
|
case "resolvelock":
|
|
h.handleGCResolveLocks(w, req)
|
|
default:
|
|
handler.WriteError(w, errors.NotSupportedf("operation(%s)", op))
|
|
}
|
|
}
|
|
|
|
func (h *TestHandler) handleGCResolveLocks(w http.ResponseWriter, req *http.Request) {
|
|
s := req.FormValue("safepoint")
|
|
safePoint, err := strconv.ParseUint(s, 10, 64)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Errorf("parse safePoint(%s) failed", s))
|
|
return
|
|
}
|
|
ctx := req.Context()
|
|
logutil.Logger(ctx).Info("start resolving locks", zap.Uint64("safePoint", safePoint))
|
|
err = gcworker.RunResolveLocks(ctx, h.Store, h.RegionCache.PDClient(), safePoint, "testGCWorker", 3)
|
|
if err != nil {
|
|
handler.WriteError(w, errors.Annotate(err, "resolveLocks failed"))
|
|
}
|
|
}
|
|
|
|
// ServeHTTP handles request of resigning ddl owner.
|
|
func (DDLHookHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != http.MethodPost {
|
|
handler.WriteError(w, errors.Errorf("This api only support POST method"))
|
|
return
|
|
}
|
|
|
|
hook := req.FormValue("ddl_hook")
|
|
switch hook {
|
|
case "ctc_hook":
|
|
err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
|
|
log.Info("on job run before", zap.String("job", job.String()))
|
|
// Only block the ctc type ddl here.
|
|
if job.Type != model.ActionModifyColumn {
|
|
return
|
|
}
|
|
switch job.SchemaState {
|
|
case model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization:
|
|
log.Warn(fmt.Sprintf("[DDL_HOOK] Hang for 0.5 seconds on %s state triggered", job.SchemaState.String()))
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
})
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
case "default_hook":
|
|
_ = failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep")
|
|
}
|
|
|
|
handler.WriteData(w, "success!")
|
|
|
|
ctx := req.Context()
|
|
logutil.Logger(ctx).Info("change ddl hook success", zap.String("to_ddl_hook", req.FormValue("ddl_hook")))
|
|
}
|
|
|
|
// ServeHTTP handles request of set server labels.
|
|
func (LabelHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != http.MethodPost {
|
|
handler.WriteError(w, errors.Errorf("This api only support POST method"))
|
|
return
|
|
}
|
|
|
|
labels := make(map[string]string)
|
|
err := json.NewDecoder(req.Body).Decode(&labels)
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
if len(labels) > 0 {
|
|
cfg := *config.GetGlobalConfig()
|
|
// Be careful of data race. The key & value of cfg.Labels must not be changed.
|
|
if cfg.Labels != nil {
|
|
for k, v := range cfg.Labels {
|
|
if _, found := labels[k]; !found {
|
|
labels[k] = v
|
|
}
|
|
}
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), requestDefaultTimeout)
|
|
if err := infosync.UpdateServerLabel(ctx, labels); err != nil {
|
|
logutil.BgLogger().Error("update etcd labels failed", zap.Any("labels", cfg.Labels), zap.Error(err))
|
|
}
|
|
cancel()
|
|
cfg.Labels = labels
|
|
config.StoreGlobalConfig(&cfg)
|
|
logutil.BgLogger().Info("update server labels", zap.Any("labels", cfg.Labels))
|
|
}
|
|
|
|
handler.WriteData(w, config.GetGlobalConfig().Labels)
|
|
}
|
|
|
|
// IngestParam is the type for lightning ingest parameters.
|
|
type IngestParam string
|
|
|
|
const (
|
|
// IngestParamMaxBatchSplitRanges is the parameter for lightning max_batch_split_ranges.
|
|
IngestParamMaxBatchSplitRanges IngestParam = "max_batch_split_ranges"
|
|
// IngestParamMaxSplitRangesPerSec is the parameter for lightning max_split_ranges_per_sec.
|
|
IngestParamMaxSplitRangesPerSec IngestParam = "max_split_ranges_per_sec"
|
|
// IngestParamMaxInflight is the parameter for lightning max_inflight.
|
|
IngestParamMaxInflight IngestParam = "max_inflight"
|
|
// IngestParamMaxPerSecond is the parameter for lightning max_per_second.
|
|
IngestParamMaxPerSecond IngestParam = "max_per_second"
|
|
)
|
|
|
|
// IngestConcurrencyHandler is the handler for lightning max_batch_split_ranges and max_inflight.
|
|
type IngestConcurrencyHandler struct {
|
|
*handler.TikvHandlerTool
|
|
param IngestParam
|
|
}
|
|
|
|
// NewIngestConcurrencyHandler creates a new IngestConcurrencyHandler.
|
|
func NewIngestConcurrencyHandler(tool *handler.TikvHandlerTool, param IngestParam) IngestConcurrencyHandler {
|
|
return IngestConcurrencyHandler{tool, param}
|
|
}
|
|
|
|
// ServeHTTP handles request of lightning max_batch_split_ranges.
|
|
func (h IngestConcurrencyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
var getter func(*meta.Mutator) (float64, bool, error)
|
|
var setter func(*meta.Mutator, float64) error
|
|
var updateGlobal func(v float64) float64
|
|
switch h.param {
|
|
case IngestParamMaxBatchSplitRanges:
|
|
getter = func(m *meta.Mutator) (float64, bool, error) {
|
|
v, isNull, err := m.GetIngestMaxBatchSplitRanges()
|
|
return float64(v), isNull, err
|
|
}
|
|
setter = func(m *meta.Mutator, value float64) error {
|
|
return m.SetIngestMaxBatchSplitRanges(int(value))
|
|
}
|
|
updateGlobal = func(v float64) float64 {
|
|
old := local.CurrentMaxBatchSplitRanges.Load()
|
|
intV := int(v)
|
|
local.CurrentMaxBatchSplitRanges.Store(&intV)
|
|
return float64(*old)
|
|
}
|
|
case IngestParamMaxSplitRangesPerSec:
|
|
getter = func(m *meta.Mutator) (float64, bool, error) {
|
|
return m.GetIngestMaxSplitRangesPerSec()
|
|
}
|
|
setter = func(m *meta.Mutator, value float64) error {
|
|
return m.SetIngestMaxSplitRangesPerSec(value)
|
|
}
|
|
updateGlobal = func(v float64) float64 {
|
|
old := local.CurrentMaxSplitRangesPerSec.Load()
|
|
local.CurrentMaxSplitRangesPerSec.Store(&v)
|
|
return *old
|
|
}
|
|
case IngestParamMaxPerSecond:
|
|
getter = func(m *meta.Mutator) (float64, bool, error) {
|
|
return m.GetIngestMaxPerSec()
|
|
}
|
|
setter = func(m *meta.Mutator, value float64) error {
|
|
return m.SetIngestMaxPerSec(value)
|
|
}
|
|
updateGlobal = func(v float64) float64 {
|
|
old := local.CurrentMaxIngestPerSec.Load()
|
|
local.CurrentMaxIngestPerSec.Store(&v)
|
|
return *old
|
|
}
|
|
case IngestParamMaxInflight:
|
|
getter = func(m *meta.Mutator) (float64, bool, error) {
|
|
v, isNull, err := m.GetIngestMaxInflight()
|
|
return float64(v), isNull, err
|
|
}
|
|
setter = func(m *meta.Mutator, value float64) error {
|
|
return m.SetIngestMaxInflight(int(value))
|
|
}
|
|
updateGlobal = func(v float64) float64 {
|
|
old := local.CurrentMaxIngestInflight.Load()
|
|
intV := int(v)
|
|
local.CurrentMaxIngestInflight.Store(&intV)
|
|
return float64(*old)
|
|
}
|
|
default:
|
|
handler.WriteError(w, errors.Errorf("unsupported ingest parameter: %s", h.param))
|
|
}
|
|
switch req.Method {
|
|
case http.MethodGet:
|
|
var respValue float64
|
|
var respIsNull bool
|
|
err := kv.RunInNewTxn(context.Background(), h.Store.(kv.Storage), false, func(_ context.Context, txn kv.Transaction) error {
|
|
m := meta.NewMutator(txn)
|
|
var getErr error
|
|
respValue, respIsNull, getErr = getter(m)
|
|
return getErr
|
|
})
|
|
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
|
|
data := map[string]any{
|
|
"value": respValue,
|
|
"is_null": respIsNull,
|
|
}
|
|
handler.WriteData(w, data)
|
|
case http.MethodPost:
|
|
var payload struct {
|
|
Value float64 `json:"value"`
|
|
}
|
|
if err := json.NewDecoder(req.Body).Decode(&payload); err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
newValue := payload.Value
|
|
if newValue < 0 {
|
|
handler.WriteError(w, errors.New("value must be >= 0"))
|
|
return
|
|
}
|
|
err := kv.RunInNewTxn(context.Background(), h.Store.(kv.Storage), true, func(_ context.Context, txn kv.Transaction) error {
|
|
m := meta.NewMutator(txn)
|
|
return setter(m, newValue)
|
|
})
|
|
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
oldVal := updateGlobal(newValue)
|
|
logutil.BgLogger().Info("set ingest concurrency",
|
|
zap.String("param", string(h.param)),
|
|
zap.Float64("oldValue", oldVal),
|
|
zap.Float64("newValue", newValue))
|
|
handler.WriteData(w, map[string]string{"message": "success"})
|
|
default:
|
|
w.WriteHeader(http.StatusMethodNotAllowed)
|
|
handler.WriteError(w, errors.New("method not allowed"))
|
|
}
|
|
}
|
|
|
|
// TxnGCStatesHandler is the handler for GC related API.
|
|
type TxnGCStatesHandler struct {
|
|
store kv.Storage
|
|
}
|
|
|
|
// NewTxnGCStatesHandler creates a TxnGCStatesHandler.
|
|
func NewTxnGCStatesHandler(store kv.Storage) *TxnGCStatesHandler {
|
|
return &TxnGCStatesHandler{
|
|
store: store,
|
|
}
|
|
}
|
|
|
|
// ServeHTTP implements the HTTP handler interface.
|
|
func (gc *TxnGCStatesHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|
if req.Method != http.MethodGet {
|
|
http.Error(w, "This API only supports GET method", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
pdStoreBackend, ok := gc.store.(kv.StorageWithPD)
|
|
if !ok {
|
|
handler.WriteError(w, errors.New("GC API only support storage with PD"))
|
|
return
|
|
}
|
|
|
|
pdCli := pdStoreBackend.GetPDClient()
|
|
keyspaceID := gc.store.GetCodec().GetKeyspaceID()
|
|
gcCli := pdCli.GetGCStatesClient(uint32(keyspaceID))
|
|
state, err := gcCli.GetGCState(context.Background())
|
|
if err != nil {
|
|
handler.WriteError(w, err)
|
|
return
|
|
}
|
|
handler.WriteData(w, state)
|
|
}
|