Files
tidb/pkg/testkit/mockstore.go

446 lines
13 KiB
Go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//go:build !codes
package testkit
import (
"context"
"flag"
"fmt"
"os"
"runtime"
"strings"
"sync"
"sync/atomic"
"syscall"
"testing"
"time"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/schematracker"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/keyspace"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/resourcemanager"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
kvstore "github.com/pingcap/tidb/pkg/store"
"github.com/pingcap/tidb/pkg/store/driver"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/pingcap/tidb/pkg/store/mockstore/teststore"
"github.com/pingcap/tidb/pkg/testkit/testenv"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/gctuner"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/stretchr/testify/require"
"go.opencensus.io/stats/view"
"go.uber.org/zap"
)
// WithTiKV flag is only used for debugging locally with real tikv cluster.
var WithTiKV = flag.String("with-tikv", "", "address of tikv cluster, if set, running test with real tikv cluster")
// TestOption is used to customize a special tk for usage.
type TestOption func(tk *TestKit)
// WithCascades test func body under different planner mode.
func WithCascades(on bool) TestOption {
return func(tk *TestKit) {
val := "off"
if on {
val = "on"
}
tk.MustExec(fmt.Sprintf("set @@tidb_enable_cascades_planner = %s", val))
}
}
// RunTestUnderCascades run the basic test body among two different planner mode.
func RunTestUnderCascades(t *testing.T, testFunc func(t *testing.T, tk *TestKit, cascades, caller string), opts ...mockstore.MockTiKVStoreOption) {
options := []struct {
name string
opt TestOption
}{
{"off", WithCascades(false)},
{"on", WithCascades(true)},
}
// get func name
pc, _, _, ok := runtime.Caller(1)
require.True(t, ok)
details := runtime.FuncForPC(pc)
funcNameIdx := strings.LastIndex(details.Name(), ".")
funcName := details.Name()[funcNameIdx+1:]
// iter the options
for _, val := range options {
t.Run(val.name, func(t *testing.T) {
store := CreateMockStore(t, opts...)
tk := NewTestKit(t, store)
val.opt(tk)
testFunc(t, tk, val.name, funcName)
})
}
}
// RunTestUnderCascadesWithDomain run the basic test body among two different planner mode.
func RunTestUnderCascadesWithDomain(t *testing.T, testFunc func(t *testing.T, tk *TestKit, domain *domain.Domain, cascades, caller string), opts ...mockstore.MockTiKVStoreOption) {
options := []struct {
name string
opt TestOption
}{
{"off", WithCascades(false)},
{"on", WithCascades(true)},
}
// get func name
pc, _, _, ok := runtime.Caller(1)
require.True(t, ok)
details := runtime.FuncForPC(pc)
funcNameIdx := strings.LastIndex(details.Name(), ".")
funcName := details.Name()[funcNameIdx+1:]
// iter the options
for _, val := range options {
t.Run(val.name, func(t *testing.T) {
store, do := CreateMockStoreAndDomain(t, opts...)
tk := NewTestKit(t, store)
val.opt(tk)
testFunc(t, tk, do, val.name, funcName)
})
}
}
// RunTestUnderCascadesAndDomainWithSchemaLease runs the basic test body among two different planner modes. It can be used to set schema lease and store options.
func RunTestUnderCascadesAndDomainWithSchemaLease(t *testing.T, lease time.Duration, opts []mockstore.MockTiKVStoreOption, testFunc func(t *testing.T, tk *TestKit, domain *domain.Domain, cascades, caller string)) {
options := []struct {
name string
opt TestOption
}{
{"off", WithCascades(false)},
{"on", WithCascades(true)},
}
// get func name
pc, _, _, ok := runtime.Caller(1)
require.True(t, ok)
details := runtime.FuncForPC(pc)
funcNameIdx := strings.LastIndex(details.Name(), ".")
funcName := details.Name()[funcNameIdx+1:]
// iter the options
for _, val := range options {
t.Run(val.name, func(t *testing.T) {
store, do := CreateMockStoreAndDomainWithSchemaLease(t, lease, opts...)
tk := NewTestKit(t, store)
val.opt(tk)
testFunc(t, tk, do, val.name, funcName)
})
}
}
// CreateMockStore return a new mock kv.Storage.
func CreateMockStore(t testing.TB, opts ...mockstore.MockTiKVStoreOption) kv.Storage {
if *WithTiKV != "" {
var d driver.TiKVDriver
var err error
store, err := d.Open("tikv://" + *WithTiKV)
require.NoError(t, err)
config.GetGlobalConfig().Store = config.StoreTypeTiKV
require.NoError(t, ddl.StartOwnerManager(context.Background(), store))
var dom *domain.Domain
dom, err = session.BootstrapSession(store)
t.Cleanup(func() {
dom.Close()
ddl.CloseOwnerManager(store)
err := store.Close()
require.NoError(t, err)
view.Stop()
})
require.NoError(t, err)
return store
}
t.Cleanup(func() {
view.Stop()
})
gctuner.GlobalMemoryLimitTuner.Stop()
tryMakeImage(t)
store, _ := CreateMockStoreAndDomain(t, opts...)
_ = store.(helper.Storage)
return store
}
// tryMakeImage tries to create a bootstraped storage, the store is used as image for testing later.
func tryMakeImage(t testing.TB, opts ...mockstore.MockTiKVStoreOption) {
if mockstore.ImageAvailable() {
return
}
retry, err := false, error(nil)
for err == nil {
retry, err = tryMakeImageOnce(t)
if !retry {
break
}
time.Sleep(100 * time.Millisecond)
}
}
func tryMakeImageOnce(t testing.TB) (retry bool, err error) {
const lockFile = "/tmp/tidb-unistore-bootstraped-image-lock-file"
lock, err := os.Create(lockFile)
if err != nil {
return true, nil
}
defer func() { err = os.Remove(lockFile) }()
defer lock.Close()
// Prevent other process from creating the image concurrently
err = syscall.Flock(int(lock.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
if err != nil {
return true, nil
}
defer func() { err = syscall.Flock(int(lock.Fd()), syscall.LOCK_UN) }()
// Now this is the only instance to do the operation.
store, err := mockstore.NewMockStore(
mockstore.WithStoreType(mockstore.EmbedUnistore),
mockstore.WithPath(mockstore.ImageFilePath))
if err != nil {
return false, err
}
if kerneltype.IsNextGen() {
testenv.UpdateConfigForNextgen(t)
kvstore.SetSystemStorage(store)
}
vardef.SetSchemaLease(500 * time.Millisecond)
session.DisableStats4Test()
domain.DisablePlanReplayerBackgroundJob4Test()
domain.DisableDumpHistoricalStats4Test()
dom, err := session.BootstrapSession(store)
if err != nil {
return false, err
}
dom.SetStatsUpdating(true)
dom.Close()
err = store.Close()
return false, err
}
// DistExecutionContext is the context
// that used in Distributed execution test for Dist task framework and DDL.
// TODO remove it after we can start multiple DDL job scheduler separately.
type DistExecutionContext struct {
Store kv.Storage
domains []*domain.Domain
deletedDomains []*domain.Domain
t testing.TB
mu sync.Mutex
}
// TriggerOwnerChange set one mock domain to DDL Owner by idx.
func (d *DistExecutionContext) TriggerOwnerChange() {
d.mu.Lock()
defer d.mu.Unlock()
for _, dom := range d.domains {
om := dom.DDL().OwnerManager()
if om.IsOwner() {
_ = om.ResignOwner(nil)
break
}
}
}
// Close cleanup running goroutines, release resources used.
func (d *DistExecutionContext) Close() {
d.t.Cleanup(func() {
d.mu.Lock()
defer d.mu.Unlock()
gctuner.GlobalMemoryLimitTuner.Stop()
var wg tidbutil.WaitGroupWrapper
for _, dom := range d.deletedDomains {
wg.Run(dom.Close)
}
for _, dom := range d.domains {
wg.Run(dom.Close)
}
wg.Wait()
err := d.Store.Close()
require.NoError(d.t, err)
})
}
// GetDomain get domain by index.
func (d *DistExecutionContext) GetDomain(idx int) *domain.Domain {
return d.domains[idx]
}
// NewDistExecutionContext create DistExecutionContext for testing.
func NewDistExecutionContext(t testing.TB, serverNum int) *DistExecutionContext {
return NewDistExecutionContextWithLease(t, serverNum, 500*time.Millisecond)
}
// NewDistExecutionContextWithLease create DistExecutionContext for testing.
func NewDistExecutionContextWithLease(t testing.TB, serverNum int, lease time.Duration) *DistExecutionContext {
store, err := teststore.NewMockStoreWithoutBootstrap()
require.NoError(t, err)
gctuner.GlobalMemoryLimitTuner.Stop()
domains := make([]*domain.Domain, 0, serverNum)
sm := MockSessionManager{}
domInfo := make([]string, 0, serverNum)
for i := range serverNum {
dom := bootstrap4DistExecution(t, store, lease)
if i != serverNum-1 {
dom.SetOnClose(func() { /* don't delete the store in domain map */ })
}
domains = append(domains, dom)
domains[i].InfoSyncer().SetSessionManager(&sm)
domInfo = append(domInfo, dom.DDL().GetID())
}
logutil.BgLogger().Info("domain DDL IDs", zap.Strings("IDs", domInfo))
res := DistExecutionContext{
schematracker.UnwrapStorage(store), domains, []*domain.Domain{}, t, sync.Mutex{}}
return &res
}
// CreateMockStoreAndDomain return a new mock kv.Storage and *domain.Domain.
func CreateMockStoreAndDomain(t testing.TB, opts ...mockstore.MockTiKVStoreOption) (kv.Storage, *domain.Domain) {
if kerneltype.IsNextGen() {
testenv.UpdateConfigForNextgen(t)
}
store, err := mockstore.NewMockStore(opts...)
require.NoError(t, err)
dom := bootstrap(t, store, 500*time.Millisecond)
sm := MockSessionManager{}
dom.InfoSyncer().SetSessionManager(&sm)
t.Cleanup(func() {
view.Stop()
gctuner.GlobalMemoryLimitTuner.Stop()
})
store = schematracker.UnwrapStorage(store)
_ = store.(helper.Storage)
if kerneltype.IsNextGen() && store.GetKeyspace() == keyspace.System {
kvstore.SetSystemStorage(store)
}
return store, dom
}
var keyspaceIDAlloc atomic.Int32
// CreateMockStoreAndDomainForKS return a new mock kv.Storage and *domain.Domain
// some keyspace.
// this function is mainly for cross keyspace test, so normally you should use
// CreateMockStoreAndDomain instead,
func CreateMockStoreAndDomainForKS(t testing.TB, ks string, opts ...mockstore.MockTiKVStoreOption) (kv.Storage, *domain.Domain) {
intest.Assert(kerneltype.IsNextGen(), "CreateMockStoreAndDomainForKS should only be used in nextgen kernel tests")
bak := *config.GetGlobalConfig()
t.Cleanup(func() {
config.StoreGlobalConfig(&bak)
})
config.UpdateGlobal(func(conf *config.Config) {
conf.KeyspaceName = ks
})
sysKSOpt := mockstore.WithCurrentKeyspaceMeta(&keyspacepb.KeyspaceMeta{
Id: uint32(0xFFFFFF) - 1,
Name: keyspace.System,
})
ksOpt := sysKSOpt
if ks != keyspace.System {
ksOpt = mockstore.WithCurrentKeyspaceMeta(&keyspacepb.KeyspaceMeta{
Id: uint32(keyspaceIDAlloc.Add(1)),
Name: ks,
})
}
ksOpts := append(opts, ksOpt)
store, dom := CreateMockStoreAndDomain(t, ksOpts...)
if ks == keyspace.System {
kvstore.SetSystemStorage(store)
}
return store, dom
}
func bootstrap4DistExecution(t testing.TB, store kv.Storage, lease time.Duration) *domain.Domain {
vardef.SetSchemaLease(lease)
session.DisableStats4Test()
domain.DisablePlanReplayerBackgroundJob4Test()
domain.DisableDumpHistoricalStats4Test()
dom, err := session.BootstrapSession4DistExecution(store)
require.NoError(t, err)
dom.SetStatsUpdating(true)
return dom
}
func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Domain {
vardef.SetSchemaLease(lease)
session.DisableStats4Test()
domain.DisablePlanReplayerBackgroundJob4Test()
domain.DisableDumpHistoricalStats4Test()
dom, err := session.BootstrapSession(store)
require.NoError(t, err)
dom.SetStatsUpdating(true)
t.Cleanup(func() {
dom.Close()
view.Stop()
err := store.Close()
require.NoError(t, err)
resourcemanager.InstanceResourceManager.Reset()
})
return dom
}
// CreateMockStoreWithSchemaLease return a new mock kv.Storage.
func CreateMockStoreWithSchemaLease(t testing.TB, lease time.Duration, opts ...mockstore.MockTiKVStoreOption) kv.Storage {
store, _ := CreateMockStoreAndDomainWithSchemaLease(t, lease, opts...)
return schematracker.UnwrapStorage(store)
}
// CreateMockStoreAndDomainWithSchemaLease return a new mock kv.Storage and *domain.Domain.
func CreateMockStoreAndDomainWithSchemaLease(t testing.TB, lease time.Duration, opts ...mockstore.MockTiKVStoreOption) (kv.Storage, *domain.Domain) {
if kerneltype.IsNextGen() {
testenv.UpdateConfigForNextgen(t)
}
store, err := mockstore.NewMockStore(opts...)
require.NoError(t, err)
dom := bootstrap(t, store, lease)
sm := MockSessionManager{}
dom.InfoSyncer().SetSessionManager(&sm)
return schematracker.UnwrapStorage(store), dom
}
// SetTiFlashReplica is to set TiFlash replica
func SetTiFlashReplica(t testing.TB, dom *domain.Domain, dbName, tableName string) {
is := dom.InfoSchema()
tblInfo, err := is.TableByName(context.Background(), ast.NewCIStr(dbName), ast.NewCIStr(tableName))
require.NoError(t, err)
tblInfo.Meta().TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}