795 lines
26 KiB
Go
795 lines
26 KiB
Go
// Copyright 2021 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package tests
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/log"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/server/handler"
|
|
"github.com/pingcap/tidb/pkg/server/handler/tikvhandler"
|
|
"github.com/pingcap/tidb/pkg/session"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
"github.com/pingcap/tidb/pkg/util/deadlockhistory"
|
|
"github.com/pingcap/tidb/pkg/util/versioninfo"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tikv/pd/client/clients/gc"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
func dummyRecord() *deadlockhistory.DeadlockRecord {
|
|
return &deadlockhistory.DeadlockRecord{}
|
|
}
|
|
|
|
func TestPostSettings(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
ts.prepareData(t)
|
|
defer ts.stopServer(t)
|
|
se, err := session.CreateSession(ts.store)
|
|
require.NoError(t, err)
|
|
|
|
form := make(url.Values)
|
|
form.Set("log_level", "error")
|
|
form.Set("tidb_general_log", "1")
|
|
form.Set("tidb_enable_async_commit", "1")
|
|
form.Set("tidb_enable_1pc", "1")
|
|
resp, err := ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, zap.ErrorLevel, log.GetLevel())
|
|
require.Equal(t, "error", config.GetGlobalConfig().Log.Level)
|
|
require.True(t, vardef.ProcessGeneralLog.Load())
|
|
val, err := se.GetSessionVars().GetGlobalSystemVar(context.Background(), vardef.TiDBEnableAsyncCommit)
|
|
require.NoError(t, err)
|
|
require.Equal(t, vardef.On, val)
|
|
val, err = se.GetSessionVars().GetGlobalSystemVar(context.Background(), vardef.TiDBEnable1PC)
|
|
require.NoError(t, err)
|
|
require.Equal(t, vardef.On, val)
|
|
|
|
form = make(url.Values)
|
|
form.Set("log_level", "fatal")
|
|
form.Set("tidb_general_log", "0")
|
|
form.Set("tidb_enable_async_commit", "0")
|
|
form.Set("tidb_enable_1pc", "0")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.False(t, vardef.ProcessGeneralLog.Load())
|
|
require.Equal(t, zap.FatalLevel, log.GetLevel())
|
|
require.Equal(t, "fatal", config.GetGlobalConfig().Log.Level)
|
|
val, err = se.GetSessionVars().GetGlobalSystemVar(context.Background(), vardef.TiDBEnableAsyncCommit)
|
|
require.NoError(t, err)
|
|
require.Equal(t, vardef.Off, val)
|
|
val, err = se.GetSessionVars().GetGlobalSystemVar(context.Background(), vardef.TiDBEnable1PC)
|
|
require.NoError(t, err)
|
|
require.Equal(t, vardef.Off, val)
|
|
form.Set("log_level", os.Getenv("log_level"))
|
|
|
|
// test ddl_slow_threshold
|
|
form = make(url.Values)
|
|
form.Set("ddl_slow_threshold", "200")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, uint32(200), atomic.LoadUint32(&vardef.DDLSlowOprThreshold))
|
|
|
|
// test check_mb4_value_in_utf8
|
|
db, err := sql.Open("mysql", ts.GetDSN())
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
err := db.Close()
|
|
require.NoError(t, err)
|
|
}()
|
|
dbt := testkit.NewDBTestKit(t, db)
|
|
|
|
dbt.MustExec("create database tidb_test;")
|
|
dbt.MustExec("use tidb_test;")
|
|
dbt.MustExec("drop table if exists t2;")
|
|
dbt.MustExec("create table t2(a varchar(100) charset utf8);")
|
|
form.Set("check_mb4_value_in_utf8", "1")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, true, config.GetGlobalConfig().Instance.CheckMb4ValueInUTF8.Load())
|
|
txn1, err := dbt.GetDB().Begin()
|
|
require.NoError(t, err)
|
|
_, err = txn1.Exec("insert t2 values (unhex('F0A48BAE'));")
|
|
require.Error(t, err)
|
|
err = txn1.Commit()
|
|
require.NoError(t, err)
|
|
|
|
// Disable CheckMb4ValueInUTF8.
|
|
form = make(url.Values)
|
|
form.Set("check_mb4_value_in_utf8", "0")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, false, config.GetGlobalConfig().Instance.CheckMb4ValueInUTF8.Load())
|
|
dbt.MustExec("insert t2 values (unhex('f09f8c80'));")
|
|
|
|
// test deadlock_history_capacity
|
|
deadlockhistory.GlobalDeadlockHistory.Resize(10)
|
|
for range 10 {
|
|
deadlockhistory.GlobalDeadlockHistory.Push(dummyRecord())
|
|
}
|
|
form = make(url.Values)
|
|
form.Set("deadlock_history_capacity", "5")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 5, len(deadlockhistory.GlobalDeadlockHistory.GetAll()))
|
|
require.Equal(t, uint64(6), deadlockhistory.GlobalDeadlockHistory.GetAll()[0].ID)
|
|
require.Equal(t, uint64(10), deadlockhistory.GlobalDeadlockHistory.GetAll()[4].ID)
|
|
deadlockhistory.GlobalDeadlockHistory.Push(dummyRecord())
|
|
require.Equal(t, 5, len(deadlockhistory.GlobalDeadlockHistory.GetAll()))
|
|
require.Equal(t, uint64(7), deadlockhistory.GlobalDeadlockHistory.GetAll()[0].ID)
|
|
require.Equal(t, uint64(11), deadlockhistory.GlobalDeadlockHistory.GetAll()[4].ID)
|
|
form = make(url.Values)
|
|
form.Set("deadlock_history_capacity", "6")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
deadlockhistory.GlobalDeadlockHistory.Push(dummyRecord())
|
|
require.Equal(t, 6, len(deadlockhistory.GlobalDeadlockHistory.GetAll()))
|
|
require.Equal(t, uint64(7), deadlockhistory.GlobalDeadlockHistory.GetAll()[0].ID)
|
|
require.Equal(t, uint64(12), deadlockhistory.GlobalDeadlockHistory.GetAll()[5].ID)
|
|
|
|
// test deadlock_history_collect_retryable
|
|
form = make(url.Values)
|
|
form.Set("deadlock_history_collect_retryable", "true")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.True(t, config.GetGlobalConfig().PessimisticTxn.DeadlockHistoryCollectRetryable)
|
|
form = make(url.Values)
|
|
form.Set("deadlock_history_collect_retryable", "false")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.False(t, config.GetGlobalConfig().PessimisticTxn.DeadlockHistoryCollectRetryable)
|
|
form = make(url.Values)
|
|
form.Set("deadlock_history_collect_retryable", "123")
|
|
resp, err = ts.FormStatus("/settings", form)
|
|
require.NoError(t, err)
|
|
require.Equal(t, 400, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
|
|
// restore original value.
|
|
config.GetGlobalConfig().Instance.CheckMb4ValueInUTF8.Store(true)
|
|
}
|
|
|
|
func TestAllServerInfo(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
resp, err := ts.FetchStatus("/info/all")
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
decoder := json.NewDecoder(resp.Body)
|
|
|
|
clusterInfo := tikvhandler.ClusterServerInfo{}
|
|
err = decoder.Decode(&clusterInfo)
|
|
require.NoError(t, err)
|
|
|
|
require.True(t, clusterInfo.IsAllServerVersionConsistent)
|
|
require.Equal(t, 1, clusterInfo.ServersNum)
|
|
|
|
store := ts.server.NewTikvHandlerTool().Store.(kv.Storage)
|
|
do, err := session.GetDomain(store)
|
|
require.NoError(t, err)
|
|
ddl := do.DDL()
|
|
require.Equal(t, ddl.GetID(), clusterInfo.OwnerID)
|
|
serverInfo, ok := clusterInfo.AllServersInfo[ddl.GetID()]
|
|
require.Equal(t, true, ok)
|
|
|
|
cfg := config.GetGlobalConfig()
|
|
require.Equal(t, cfg.AdvertiseAddress, serverInfo.IP)
|
|
require.Equal(t, cfg.Status.StatusPort, serverInfo.StatusPort)
|
|
require.Equal(t, cfg.Lease, serverInfo.Lease)
|
|
require.Equal(t, mysql.ServerVersion, serverInfo.Version)
|
|
require.Equal(t, versioninfo.TiDBGitHash, serverInfo.GitHash)
|
|
require.Equal(t, ddl.GetID(), serverInfo.ID)
|
|
}
|
|
|
|
func TestRegionsFromMeta(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
resp, err := ts.FetchStatus("/regions/meta")
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
|
|
// Verify the resp body.
|
|
decoder := json.NewDecoder(resp.Body)
|
|
metas := make([]handler.RegionMeta, 0)
|
|
err = decoder.Decode(&metas)
|
|
require.NoError(t, err)
|
|
for _, m := range metas {
|
|
require.True(t, m.ID != 0)
|
|
}
|
|
|
|
// test no panic
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/errGetRegionByIDEmpty", `return(true)`))
|
|
defer func() {
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/errGetRegionByIDEmpty"))
|
|
}()
|
|
resp1, err := ts.FetchStatus("/regions/meta")
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp1.Body.Close()) }()
|
|
}
|
|
|
|
func TestTiFlashReplica(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
ts.prepareData(t)
|
|
defer ts.stopServer(t)
|
|
|
|
db, err := sql.Open("mysql", ts.GetDSN())
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
err := db.Close()
|
|
require.NoError(t, err)
|
|
}()
|
|
dbt := testkit.NewDBTestKit(t, db)
|
|
|
|
defer func(originGC bool) {
|
|
if originGC {
|
|
ddlutil.EmulatorGCEnable()
|
|
} else {
|
|
ddlutil.EmulatorGCDisable()
|
|
}
|
|
}(ddlutil.IsEmulatorGCEnable())
|
|
|
|
// Disable emulator GC.
|
|
// Otherwise emulator GC will delete table record as soon as possible after execute drop table DDL.
|
|
ddlutil.EmulatorGCDisable()
|
|
gcTimeFormat := "20060102-15:04:05 -0700 MST"
|
|
timeBeforeDrop := time.Now().Add(0 - 48*60*60*time.Second).Format(gcTimeFormat)
|
|
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', ''),('tikv_gc_enable','true','')
|
|
ON DUPLICATE KEY
|
|
UPDATE variable_value = '%[1]s'`
|
|
// Set GC safe point and enable GC.
|
|
dbt.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
|
|
|
resp, err := ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var data []tikvhandler.TableFlashReplicaInfo
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 0, len(data))
|
|
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`))
|
|
defer func() {
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount"))
|
|
}()
|
|
|
|
dbt.MustExec("use tidb")
|
|
dbt.MustExec("alter table test set tiflash replica 2 location labels 'a','b';")
|
|
|
|
resp, err = ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 1, len(data))
|
|
require.Equal(t, uint64(2), data[0].ReplicaCount)
|
|
require.Equal(t, "a,b", strings.Join(data[0].LocationLabels, ","))
|
|
require.Equal(t, false, data[0].Available)
|
|
|
|
resp, err = ts.PostStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(`{"id":184,"region_count":3,"flash_region_count":3}`)))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp)
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, "[schema:1146]Table which ID = 184 does not exist.", string(body))
|
|
|
|
tbl, err := ts.domain.InfoSchema().TableByName(context.Background(), ast.NewCIStr("tidb"), ast.NewCIStr("test"))
|
|
require.NoError(t, err)
|
|
req := fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, tbl.Meta().ID)
|
|
resp, err = ts.PostStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(req)))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp)
|
|
body, err = io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, "", string(body))
|
|
|
|
resp, err = ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 1, len(data))
|
|
require.Equal(t, uint64(2), data[0].ReplicaCount)
|
|
require.Equal(t, "a,b", strings.Join(data[0].LocationLabels, ","))
|
|
require.Equal(t, true, data[0].Available)
|
|
|
|
// Should not take effect.
|
|
dbt.MustExec("alter table test set tiflash replica 2 location labels 'a','b';")
|
|
checkFunc := func() {
|
|
resp, err := ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 1, len(data))
|
|
require.Equal(t, uint64(2), data[0].ReplicaCount)
|
|
require.Equal(t, "a,b", strings.Join(data[0].LocationLabels, ","))
|
|
require.Equal(t, true, data[0].Available)
|
|
}
|
|
|
|
// Test for get dropped table tiflash replica info.
|
|
dbt.MustExec("drop table test")
|
|
checkFunc()
|
|
|
|
// Test unique table id replica info.
|
|
dbt.MustExec("flashback table test")
|
|
checkFunc()
|
|
dbt.MustExec("drop table test")
|
|
checkFunc()
|
|
dbt.MustExec("flashback table test")
|
|
checkFunc()
|
|
|
|
// Test for partition table.
|
|
dbt.MustExec("alter table pt set tiflash replica 2 location labels 'a','b';")
|
|
dbt.MustExec("alter table test set tiflash replica 0;")
|
|
resp, err = ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, 3, len(data))
|
|
require.Equal(t, uint64(2), data[0].ReplicaCount)
|
|
require.Equal(t, "a,b", strings.Join(data[0].LocationLabels, ","))
|
|
require.Equal(t, false, data[0].Available)
|
|
|
|
pid0 := data[0].ID
|
|
pid1 := data[1].ID
|
|
pid2 := data[2].ID
|
|
|
|
// Mock for partition 1 replica was available.
|
|
req = fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, pid1)
|
|
resp, err = ts.PostStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(req)))
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
resp, err = ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 3, len(data))
|
|
require.Equal(t, false, data[0].Available)
|
|
require.Equal(t, true, data[1].Available)
|
|
require.Equal(t, false, data[2].Available)
|
|
|
|
// Mock for partition 0,2 replica was available.
|
|
req = fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, pid0)
|
|
resp, err = ts.PostStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(req)))
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
req = fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, pid2)
|
|
resp, err = ts.PostStatus("/tiflash/replica-deprecated", "application/json", bytes.NewBuffer([]byte(req)))
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
checkFunc = func() {
|
|
resp, err := ts.FetchStatus("/tiflash/replica-deprecated")
|
|
require.NoError(t, err)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&data)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Equal(t, 3, len(data))
|
|
require.Equal(t, true, data[0].Available)
|
|
require.Equal(t, true, data[1].Available)
|
|
require.Equal(t, true, data[2].Available)
|
|
}
|
|
|
|
// Test for get truncated table tiflash replica info.
|
|
dbt.MustExec("truncate table pt")
|
|
dbt.MustExec("alter table pt set tiflash replica 0;")
|
|
checkFunc()
|
|
}
|
|
|
|
func TestDebugRoutes(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
|
|
debugRoutes := []string{
|
|
"/debug/pprof/",
|
|
"/debug/pprof/heap?debug=1",
|
|
"/debug/pprof/goroutine?debug=1",
|
|
"/debug/pprof/goroutine?debug=2",
|
|
"/debug/pprof/allocs?debug=1",
|
|
"/debug/pprof/block?debug=1",
|
|
"/debug/pprof/threadcreate?debug=1",
|
|
"/debug/pprof/cmdline",
|
|
"/debug/pprof/profile?seconds=5",
|
|
"/debug/pprof/mutex?debug=1",
|
|
"/debug/pprof/symbol",
|
|
"/debug/pprof/trace",
|
|
"/debug/gogc",
|
|
// "/debug/zip", // this creates unexpected goroutines which will make goleak complain, so we skip it for now
|
|
"/debug/ballast-object-sz",
|
|
}
|
|
for _, route := range debugRoutes {
|
|
resp, err := ts.FetchStatus(route)
|
|
require.NoError(t, err, fmt.Sprintf("GET route %s failed", route))
|
|
require.Equal(t, http.StatusOK, resp.StatusCode, fmt.Sprintf("GET route %s failed", route))
|
|
require.NoError(t, resp.Body.Close())
|
|
}
|
|
}
|
|
|
|
func TestFailpointHandler(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
|
|
// start server without enabling failpoint integration
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
resp, err := ts.FetchStatus("/fail/")
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
ts.stopServer(t)
|
|
|
|
// enable failpoint integration and start server
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/enableTestAPI", "return"))
|
|
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/enableTestAPI")) }()
|
|
ts.startServer(t)
|
|
resp, err = ts.FetchStatus("/fail/")
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
b, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
require.True(t, strings.Contains(string(b), "github.com/pingcap/tidb/pkg/server/enableTestAPI=return"))
|
|
require.NoError(t, resp.Body.Close())
|
|
}
|
|
|
|
func TestTestHandler(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
|
|
// start server without enabling failpoint integration
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
resp, err := ts.FetchStatus("/test")
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
|
require.NoError(t, resp.Body.Close())
|
|
ts.stopServer(t)
|
|
|
|
// enable failpoint integration and start server
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/enableTestAPI", "return"))
|
|
defer func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/enableTestAPI")) }()
|
|
ts.startServer(t)
|
|
|
|
resp, err = ts.FetchStatus("/test/gc/gc")
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus("/test/gc/resolvelock")
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus("/test/gc/resolvelock?safepoint=a")
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus("/test/gc/resolvelock?physical=1")
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus("/test/gc/resolvelock?physical=true")
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus("/test/gc/resolvelock?safepoint=10000&physical=true")
|
|
require.NoError(t, err)
|
|
err = resp.Body.Close()
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
}
|
|
|
|
func TestServerInfo(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
resp, err := ts.FetchStatus("/info")
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
decoder := json.NewDecoder(resp.Body)
|
|
|
|
info := tikvhandler.ServerInfo{}
|
|
err = decoder.Decode(&info)
|
|
require.NoError(t, err)
|
|
|
|
cfg := config.GetGlobalConfig()
|
|
require.True(t, info.IsOwner)
|
|
require.Equal(t, cfg.AdvertiseAddress, info.IP)
|
|
require.Equal(t, cfg.Status.StatusPort, info.StatusPort)
|
|
require.Equal(t, cfg.Lease, info.Lease)
|
|
require.Equal(t, mysql.ServerVersion, info.Version)
|
|
require.Equal(t, versioninfo.TiDBGitHash, info.GitHash)
|
|
|
|
store := ts.server.NewTikvHandlerTool().Store.(kv.Storage)
|
|
do, err := session.GetDomain(store)
|
|
require.NoError(t, err)
|
|
d := do.DDL()
|
|
require.Equal(t, d.GetID(), info.ID)
|
|
}
|
|
|
|
func TestGetSchemaStorage(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
ts.prepareData(t)
|
|
defer ts.stopServer(t)
|
|
|
|
do := ts.domain
|
|
h := do.StatsHandle()
|
|
do.SetStatsUpdating(true)
|
|
|
|
tk := testkit.NewTestKit(t, ts.store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("drop table if exists t")
|
|
tk.MustExec("create table t (c int, d int, e char(5), index idx(e))")
|
|
testutil.HandleNextDDLEventWithTxn(h)
|
|
tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`)
|
|
h.FlushStats()
|
|
|
|
resp, err := ts.FetchStatus("/schema_storage/test")
|
|
require.NoError(t, err)
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var tables []*tikvhandler.SchemaTableStorage
|
|
err = decoder.Decode(&tables)
|
|
require.NoError(t, err)
|
|
require.NoError(t, resp.Body.Close())
|
|
require.Len(t, tables, 1)
|
|
expects := []string{`t`}
|
|
names := make([]string, len(tables))
|
|
for i, v := range tables {
|
|
names[i] = v.TableName
|
|
}
|
|
|
|
sort.Strings(names)
|
|
require.Equal(t, expects, names)
|
|
require.Equal(t, []int64{3, 16, 48, 0, 0, 0}, []int64{
|
|
tables[0].TableRows,
|
|
tables[0].AvgRowLength,
|
|
tables[0].DataLength,
|
|
tables[0].MaxDataLength,
|
|
tables[0].IndexLength,
|
|
tables[0].DataFree,
|
|
})
|
|
}
|
|
|
|
func TestTTL(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
|
|
db, err := sql.Open("mysql", ts.GetDSN())
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
err := db.Close()
|
|
require.NoError(t, err)
|
|
}()
|
|
dbt := testkit.NewDBTestKit(t, db)
|
|
dbt.MustExec("create database test_ttl")
|
|
dbt.MustExec("use test_ttl")
|
|
dbt.MustExec("create table t1(t timestamp) TTL=`t` + interval 1 day")
|
|
|
|
getJobCnt := func(status string) int {
|
|
selectSQL := "select count(1) from mysql.tidb_ttl_job_history"
|
|
if status != "" {
|
|
selectSQL += " where status = '" + status + "'"
|
|
}
|
|
|
|
rs, err := db.Query(selectSQL)
|
|
require.NoError(t, err)
|
|
defer func() {
|
|
require.NoError(t, rs.Close())
|
|
}()
|
|
|
|
cnt := -1
|
|
rowNum := 0
|
|
for rs.Next() {
|
|
rowNum++
|
|
require.Equal(t, 1, rowNum)
|
|
require.NoError(t, rs.Scan(&cnt))
|
|
}
|
|
require.NoError(t, rs.Err())
|
|
return cnt
|
|
}
|
|
|
|
waitAllJobsFinish := func() {
|
|
start := time.Now()
|
|
for time.Since(start) < time.Minute {
|
|
cnt := getJobCnt("running")
|
|
if cnt == 0 {
|
|
return
|
|
}
|
|
}
|
|
require.Fail(t, "timeout for waiting job finished")
|
|
}
|
|
|
|
doTrigger := func(db, tb string) (map[string]any, error) {
|
|
resp, err := ts.PostStatus(fmt.Sprintf("/test/ttl/trigger/%s/%s", db, tb), "application/json", nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
defer func() {
|
|
require.NoError(t, resp.Body.Close())
|
|
}()
|
|
|
|
body, err := io.ReadAll(resp.Body)
|
|
require.NoError(t, err)
|
|
|
|
if resp.StatusCode != 200 {
|
|
return nil, errors.Errorf("http status: %s, %s", resp.Status, body)
|
|
}
|
|
|
|
var obj map[string]any
|
|
require.NoError(t, json.Unmarshal(body, &obj))
|
|
return obj, nil
|
|
}
|
|
|
|
expectedJobCnt := 1
|
|
obj, err := doTrigger("test_ttl", "t1")
|
|
require.NoError(t, err)
|
|
if err != nil {
|
|
// if error returns, may be a job is running, we should skip it and have a next try when it stopped
|
|
require.Equal(t, expectedJobCnt, getJobCnt(""))
|
|
waitAllJobsFinish()
|
|
obj, err = doTrigger("test_ttl", "t1")
|
|
require.NoError(t, err)
|
|
expectedJobCnt++
|
|
}
|
|
|
|
_, ok := obj["table_result"]
|
|
require.True(t, ok)
|
|
require.Equal(t, expectedJobCnt, getJobCnt(""))
|
|
|
|
// error case, table not exist
|
|
obj, err = doTrigger("test_ttl", "t2")
|
|
require.Nil(t, obj)
|
|
require.EqualError(t, err, "http status: 400 Bad Request, table test_ttl.t2 not exists")
|
|
}
|
|
|
|
func TestGC(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
|
|
var data url.Values
|
|
resp, err := ts.FormStatus("/txn-gc-states", data)
|
|
require.NoError(t, err)
|
|
require.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus("/txn-gc-states")
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
|
|
// Verify the resp body.
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var state gc.GCState
|
|
err = decoder.Decode(&state)
|
|
require.NoError(t, err)
|
|
|
|
var empty gc.GCState
|
|
require.NotEqual(t, empty, state)
|
|
}
|
|
|
|
func TestIngestParam(t *testing.T) {
|
|
ts := createBasicHTTPHandlerTestSuite()
|
|
ts.startServer(t)
|
|
defer ts.stopServer(t)
|
|
|
|
testCases := []struct {
|
|
url string
|
|
defaultVal any
|
|
modVal any
|
|
expectedVal any
|
|
}{
|
|
{"/ingest/max-batch-split-ranges", float64(2048), 1000, float64(1000)},
|
|
{"/ingest/max-split-ranges-per-sec", float64(0), 2000, float64(2000)},
|
|
{"/ingest/max-ingest-inflight", float64(0), 1000, float64(1000)},
|
|
{"/ingest/max-ingest-per-sec", float64(0), 2000, float64(2000)},
|
|
}
|
|
|
|
for _, tc := range testCases {
|
|
t.Run(tc.url, func(t *testing.T) {
|
|
resp, err := ts.FetchStatus(tc.url)
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
decoder := json.NewDecoder(resp.Body)
|
|
var payload struct {
|
|
Value float64 `json:"value"`
|
|
IsNull bool `json:"is_null"`
|
|
}
|
|
err = decoder.Decode(&payload)
|
|
require.NoError(t, err)
|
|
require.Equal(t, tc.defaultVal, payload.Value)
|
|
|
|
resp, err = ts.PostStatus(tc.url, "", bytes.NewBuffer([]byte(fmt.Sprintf(`{"value": %v}`, tc.modVal))))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, resp)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
|
|
resp, err = ts.FetchStatus(tc.url)
|
|
require.NoError(t, err)
|
|
defer func() { require.NoError(t, resp.Body.Close()) }()
|
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
|
decoder = json.NewDecoder(resp.Body)
|
|
err = decoder.Decode(&payload)
|
|
require.NoError(t, err)
|
|
require.Equal(t, tc.expectedVal, payload.Value)
|
|
})
|
|
}
|
|
}
|