mdl: always enable and make it read only in nextgen (#62865)
ref pingcap/tidb#61702
This commit is contained in:
@ -1137,7 +1137,7 @@ func (d *ddl) cleanDeadTableLock(unlockTables []model.TableLockTpInfo, se model.
|
||||
|
||||
// SwitchMDL enables MDL or disable MDL.
|
||||
func (d *ddl) SwitchMDL(enable bool) error {
|
||||
isEnableBefore := vardef.EnableMDL.Load()
|
||||
isEnableBefore := vardef.IsMDLEnabled()
|
||||
if isEnableBefore == enable {
|
||||
return nil
|
||||
}
|
||||
@ -1161,7 +1161,7 @@ func (d *ddl) SwitchMDL(enable bool) error {
|
||||
return errors.New("please wait for all jobs done")
|
||||
}
|
||||
|
||||
vardef.EnableMDL.Store(enable)
|
||||
vardef.SetEnableMDL(enable)
|
||||
err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), d.store, true, func(_ context.Context, txn kv.Transaction) error {
|
||||
m := meta.NewMutator(txn)
|
||||
oldEnable, _, err := m.GetMetadataLock()
|
||||
@ -1185,7 +1185,7 @@ func (d *ddl) SwitchMDL(enable bool) error {
|
||||
// It should be called before any DDL that could break data consistency.
|
||||
// This provides a safe window for async commit and 1PC to commit with an old schema.
|
||||
func delayForAsyncCommit() {
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
// If metadata lock is enabled. The transaction of DDL must begin after
|
||||
// pre-write of the async commit transaction, then the commit ts of DDL
|
||||
// must be greater than the async commit transaction. In this case, the
|
||||
|
||||
@ -566,7 +566,7 @@ func (s *jobScheduler) transitOneJobStepAndWaitSync(wk *worker, jobCtx *jobConte
|
||||
// current owner.
|
||||
job := jobW.Job
|
||||
if jobCtx.isUnSynced(job.ID) || (job.Started() && !jobCtx.maybeAlreadyRunOnce(job.ID)) {
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
version, err := s.sysTblMgr.GetMDLVer(s.schCtx, job.ID)
|
||||
if err == nil {
|
||||
jobCtx.logger.Info("the job have schema version un-synced",
|
||||
@ -624,7 +624,7 @@ func (s *jobScheduler) cleanMDLInfo(job *model.Job, ownerID string) {
|
||||
defer func() {
|
||||
metrics.DDLCleanMDLInfoHist.Observe(time.Since(start).Seconds())
|
||||
}()
|
||||
if !vardef.EnableMDL.Load() {
|
||||
if !vardef.IsMDLEnabled() {
|
||||
return
|
||||
}
|
||||
var sql string
|
||||
|
||||
@ -308,7 +308,7 @@ func (w *worker) updateDDLJob(jobCtx *jobContext, job *model.Job, updateRawArgs
|
||||
|
||||
// registerMDLInfo registers metadata lock info.
|
||||
func (w *worker) registerMDLInfo(job *model.Job, ver int64) error {
|
||||
if !vardef.EnableMDL.Load() {
|
||||
if !vardef.IsMDLEnabled() {
|
||||
return nil
|
||||
}
|
||||
if ver == 0 {
|
||||
@ -1124,7 +1124,7 @@ func updateGlobalVersionAndWaitSynced(
|
||||
err = jobCtx.schemaVerSyncer.OwnerUpdateGlobalVersion(ctx, latestSchemaVersion)
|
||||
if err != nil {
|
||||
logutil.DDLLogger().Info("update latest schema version failed", zap.Int64("ver", latestSchemaVersion), zap.Error(err))
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
return err
|
||||
}
|
||||
if terror.ErrorEqual(err, context.DeadlineExceeded) {
|
||||
|
||||
@ -68,7 +68,7 @@ func (s *MemSyncer) UpdateSelfVersion(_ context.Context, jobID int64, version in
|
||||
failpoint.Return(errors.New("mock update mdl to etcd error"))
|
||||
}
|
||||
})
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
s.mdlSchemaVersions.Store(jobID, version)
|
||||
} else {
|
||||
atomic.StoreInt64(&s.selfSchemaVersion, version)
|
||||
@ -117,7 +117,7 @@ func (s *MemSyncer) WaitVersionSynced(ctx context.Context, jobID int64, latestVe
|
||||
case <-ctx.Done():
|
||||
return errors.Trace(ctx.Err())
|
||||
case <-ticker.C:
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
ver, ok := s.mdlSchemaVersions.Load(jobID)
|
||||
if ok && ver.(int64) >= latestVer {
|
||||
return nil
|
||||
|
||||
@ -282,7 +282,7 @@ func (s *etcdSyncer) UpdateSelfVersion(ctx context.Context, jobID int64, version
|
||||
ver := strconv.FormatInt(version, 10)
|
||||
var err error
|
||||
var path string
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
// If jobID is 0, it doesn't need to put into etcd `DDLAllSchemaVersionsByJob` key.
|
||||
if jobID == 0 {
|
||||
return nil
|
||||
@ -325,7 +325,7 @@ func (s *etcdSyncer) removeSelfVersionPath() error {
|
||||
// WaitVersionSynced implements Syncer.WaitVersionSynced interface.
|
||||
func (s *etcdSyncer) WaitVersionSynced(ctx context.Context, jobID int64, latestVer int64, checkAssumedSvr bool) error {
|
||||
startTime := time.Now()
|
||||
if !vardef.EnableMDL.Load() {
|
||||
if !vardef.IsMDLEnabled() {
|
||||
time.Sleep(CheckVersFirstWaitTime)
|
||||
}
|
||||
notMatchVerCnt := 0
|
||||
@ -348,7 +348,7 @@ func (s *etcdSyncer) WaitVersionSynced(ctx context.Context, jobID int64, latestV
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
serverInfos, err := infosync.GetServersForISSync(ctx, checkAssumedSvr)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -375,7 +375,7 @@ func (s *etcdSyncer) WaitVersionSynced(ctx context.Context, jobID int64, latestV
|
||||
}
|
||||
|
||||
// Check all schema versions.
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
notifyCh := make(chan struct{})
|
||||
var unmatchedNodeInfo atomic.Pointer[string]
|
||||
matchFn := func(nodeVersions map[string]int64) bool {
|
||||
|
||||
@ -174,7 +174,7 @@ func TestSyncJobSchemaVerLoop(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// job 4 is matched using WaitVersionSynced
|
||||
vardef.EnableMDL.Store(true)
|
||||
vardef.SetEnableMDL(true)
|
||||
serverInfos := map[string]*serverinfo.ServerInfo{"aa": {StaticInfo: serverinfo.StaticInfo{ID: "aa", IP: "test", Port: 4000}}}
|
||||
bytes, err := json.Marshal(serverInfos)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -39,7 +39,7 @@ import (
|
||||
const minInterval = 10 * time.Nanosecond // It's used to test timeout.
|
||||
|
||||
func TestSyncerSimple(t *testing.T) {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
|
||||
}
|
||||
|
||||
@ -43,7 +43,7 @@ func checkRespKV(t *testing.T, kvCount int, key, val string, kvs ...*mvccpb.KeyV
|
||||
}
|
||||
|
||||
func TestStateSyncerSimple(t *testing.T) {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("integration.NewClusterV3 will create file contains a colon which is not allowed on Windows")
|
||||
}
|
||||
|
||||
@ -8,9 +8,10 @@ go_test(
|
||||
"mdl_test.go",
|
||||
],
|
||||
flaky = True,
|
||||
shard_count = 37,
|
||||
shard_count = 38,
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
"//pkg/ddl",
|
||||
"//pkg/ddl/ingest/testutil",
|
||||
"//pkg/errno",
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/ddl"
|
||||
ingesttestutil "github.com/pingcap/tidb/pkg/ddl/ingest/testutil"
|
||||
mysql "github.com/pingcap/tidb/pkg/errno"
|
||||
@ -1159,6 +1160,9 @@ func TestMDLEnable2Disable(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSwitchMDL(t *testing.T) {
|
||||
if kerneltype.IsNextGen() {
|
||||
t.Skip("MDL is always enabled and read only in nextgen")
|
||||
}
|
||||
store, dom := testkit.CreateMockStoreAndDomain(t)
|
||||
sv := server.CreateMockServer(t, store)
|
||||
|
||||
@ -1179,6 +1183,19 @@ func TestSwitchMDL(t *testing.T) {
|
||||
tk.MustQuery("show global variables like 'tidb_enable_metadata_lock'").Check(testkit.Rows("tidb_enable_metadata_lock OFF"))
|
||||
}
|
||||
|
||||
func TestSetMDLInNextGen(t *testing.T) {
|
||||
if kerneltype.IsClassic() {
|
||||
t.Skip("only run in nextgen")
|
||||
}
|
||||
store, _ := testkit.CreateMockStoreAndDomain(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
|
||||
require.ErrorContains(t, tk.ExecToErr("set global tidb_enable_metadata_lock=0"),
|
||||
"setting tidb_enable_metadata_lock is not supported in the next generation of TiDB")
|
||||
require.ErrorContains(t, tk.ExecToErr("set global tidb_enable_metadata_lock=1"),
|
||||
"setting tidb_enable_metadata_lock is not supported in the next generation of TiDB")
|
||||
}
|
||||
|
||||
func TestMDLViewItself(t *testing.T) {
|
||||
store, dom := testkit.CreateMockStoreAndDomain(t)
|
||||
sv := server.CreateMockServer(t, store)
|
||||
|
||||
@ -226,7 +226,7 @@ func (s *Syncer) MDLCheckLoop(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
if !vardef.EnableMDL.Load() {
|
||||
if !vardef.IsMDLEnabled() {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@ -233,7 +233,7 @@ func (v *validator) Check(txnTS uint64, schemaVer int64, relatedPhysicalTableIDs
|
||||
// is true, we need to check by schema delta.
|
||||
// When switch MDL from on to off, the needCheckSchemaByDelta is false,
|
||||
// and EnableMDL is also false, so we still need to check by schema delta.
|
||||
if needCheckSchemaByDelta || !vardef.EnableMDL.Load() {
|
||||
if needCheckSchemaByDelta || !vardef.IsMDLEnabled() {
|
||||
changed := v.isRelatedTablesChanged(schemaVer, relatedPhysicalTableIDs)
|
||||
if changed {
|
||||
return nil, validatorapi.ResultFail
|
||||
|
||||
@ -507,7 +507,7 @@ func TestDupRandJoinCondsPushDown(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTablePartition(t *testing.T) {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
definitions := []model.PartitionDefinition{
|
||||
{
|
||||
ID: 41,
|
||||
@ -1127,7 +1127,7 @@ func TestAggPrune(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestVisitInfo(t *testing.T) {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
tests := []struct {
|
||||
sql string
|
||||
ans []visitInfo
|
||||
|
||||
@ -98,7 +98,7 @@ func TestGroupExists(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGroupFingerPrint(t *testing.T) {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
p := parser.New()
|
||||
stmt1, err := p.ParseOneStmt("select * from t where a > 1 and a < 100", "", "")
|
||||
require.NoError(t, err)
|
||||
@ -230,7 +230,7 @@ func TestFirstElemAfterDelete(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBuildKeyInfo(t *testing.T) {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
p := parser.New()
|
||||
ctx := plannercore.MockContext()
|
||||
defer func() {
|
||||
|
||||
@ -3417,7 +3417,7 @@ func InitMDLVariableForBootstrap(store kv.Storage) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
vardef.EnableMDL.Store(true)
|
||||
vardef.SetEnableMDL(true)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -3461,9 +3461,9 @@ func InitMDLVariableForUpgrade(store kv.Storage) (bool, error) {
|
||||
return nil
|
||||
})
|
||||
if isNull || !enable {
|
||||
vardef.EnableMDL.Store(false)
|
||||
vardef.SetEnableMDL(false)
|
||||
} else {
|
||||
vardef.EnableMDL.Store(true)
|
||||
vardef.SetEnableMDL(true)
|
||||
}
|
||||
return isNull, err
|
||||
}
|
||||
@ -3490,7 +3490,7 @@ func InitMDLVariable(store kv.Storage) error {
|
||||
}
|
||||
return nil
|
||||
})
|
||||
vardef.EnableMDL.Store(enable)
|
||||
vardef.SetEnableMDL(enable)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "vardef",
|
||||
@ -11,6 +11,7 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
"//pkg/executor/join/joinversion",
|
||||
"//pkg/parser/mysql",
|
||||
"//pkg/util/memory",
|
||||
@ -20,3 +21,15 @@ go_library(
|
||||
"@org_uber_go_atomic//:atomic",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "vardef_test",
|
||||
timeout = "short",
|
||||
srcs = ["tidb_vars_test.go"],
|
||||
embed = [":vardef"],
|
||||
flaky = True,
|
||||
deps = [
|
||||
"//pkg/config/kerneltype",
|
||||
"@com_github_stretchr_testify//require",
|
||||
],
|
||||
)
|
||||
|
||||
@ -23,6 +23,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/executor/join/joinversion"
|
||||
"github.com/pingcap/tidb/pkg/parser/mysql"
|
||||
"github.com/pingcap/tidb/pkg/util/memory"
|
||||
@ -1732,7 +1733,7 @@ var (
|
||||
EnableDistTask = atomic.NewBool(DefTiDBEnableDistTask)
|
||||
EnableFastCreateTable = atomic.NewBool(DefTiDBEnableFastCreateTable)
|
||||
EnableNoopVariables = atomic.NewBool(DefTiDBEnableNoopVariables)
|
||||
EnableMDL = atomic.NewBool(false)
|
||||
enableMDL = atomic.NewBool(false)
|
||||
AutoAnalyzePartitionBatchSize = atomic.NewInt64(DefTiDBAutoAnalyzePartitionBatchSize)
|
||||
AutoAnalyzeConcurrency = atomic.NewInt32(DefTiDBAutoAnalyzeConcurrency)
|
||||
// TODO: set value by session variable
|
||||
@ -2133,3 +2134,22 @@ func SetMaxDeltaSchemaCount(cnt int64) {
|
||||
func GetMaxDeltaSchemaCount() int64 {
|
||||
return goatomic.LoadInt64(&MaxDeltaSchemaCount)
|
||||
}
|
||||
|
||||
// IsMDLEnabled returns if MDL is enabled.
|
||||
func IsMDLEnabled() bool {
|
||||
if kerneltype.IsNextGen() {
|
||||
// MDL is very useful to avoid the 'Information schema is changed' error,
|
||||
// in next-gen TiDB, MDL is always enabled, as we don't have the compatibility
|
||||
// debts.
|
||||
// some tests might call SetEnableMDL(false) to disable MDL, but it is not
|
||||
// expected in nextgen, we use this branch to ensure MDL is always enabled,
|
||||
// even in test.
|
||||
return true
|
||||
}
|
||||
return enableMDL.Load()
|
||||
}
|
||||
|
||||
// SetEnableMDL sets the MDL enable status.
|
||||
func SetEnableMDL(enabled bool) {
|
||||
enableMDL.Store(enabled)
|
||||
}
|
||||
|
||||
38
pkg/sessionctx/vardef/tidb_vars_test.go
Normal file
38
pkg/sessionctx/vardef/tidb_vars_test.go
Normal file
@ -0,0 +1,38 @@
|
||||
// Copyright 2025 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package vardef
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIsMDLEnabledInNextGen(t *testing.T) {
|
||||
if kerneltype.IsClassic() {
|
||||
t.Skip("only run in next-gen")
|
||||
}
|
||||
bak := enableMDL.Load()
|
||||
t.Cleanup(func() {
|
||||
SetEnableMDL(bak)
|
||||
})
|
||||
|
||||
// IsMDLEnabled must return true regardless of the value of enableMDL.
|
||||
SetEnableMDL(false)
|
||||
require.True(t, IsMDLEnabled())
|
||||
SetEnableMDL(true)
|
||||
require.True(t, IsMDLEnabled())
|
||||
}
|
||||
@ -1492,7 +1492,10 @@ var defaultSysVars = []*SysVar{
|
||||
},
|
||||
},
|
||||
{Scope: vardef.ScopeGlobal, Name: vardef.TiDBEnableMDL, Value: BoolToOnOff(vardef.DefTiDBEnableMDL), Type: vardef.TypeBool, SetGlobal: func(_ context.Context, vars *SessionVars, val string) error {
|
||||
if vardef.EnableMDL.Load() != TiDBOptOn(val) {
|
||||
if kerneltype.IsNextGen() {
|
||||
return errNotSupportedInNextGen.FastGenByArgs(fmt.Sprintf("setting %s", vardef.TiDBEnableMDL))
|
||||
}
|
||||
if vardef.IsMDLEnabled() != TiDBOptOn(val) {
|
||||
err := SwitchMDL(TiDBOptOn(val))
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1500,7 +1503,7 @@ var defaultSysVars = []*SysVar{
|
||||
}
|
||||
return nil
|
||||
}, GetGlobal: func(_ context.Context, vars *SessionVars) (string, error) {
|
||||
return BoolToOnOff(vardef.EnableMDL.Load()), nil
|
||||
return BoolToOnOff(vardef.IsMDLEnabled()), nil
|
||||
}},
|
||||
{Scope: vardef.ScopeGlobal, Name: vardef.TiDBEnableDistTask, Value: BoolToOnOff(vardef.DefTiDBEnableDistTask), Type: vardef.TypeBool, SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
|
||||
if vardef.EnableDistTask.Load() != TiDBOptOn(val) {
|
||||
|
||||
@ -133,7 +133,7 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
|
||||
sessVars.TxnCtxMu.Lock()
|
||||
sessVars.TxnCtx = txnCtx
|
||||
sessVars.TxnCtxMu.Unlock()
|
||||
if vardef.EnableMDL.Load() {
|
||||
if vardef.IsMDLEnabled() {
|
||||
sessVars.TxnCtx.EnableMDL = true
|
||||
}
|
||||
|
||||
|
||||
@ -48,7 +48,7 @@ func BenchmarkAddRecordInPipelinedDML(b *testing.B) {
|
||||
tb, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
|
||||
require.NoError(b, err)
|
||||
|
||||
vardef.EnableMDL.Store(true)
|
||||
vardef.SetEnableMDL(true)
|
||||
|
||||
// Pre-create data to be inserted
|
||||
records := make([][]types.Datum, batchSize)
|
||||
@ -102,7 +102,7 @@ func BenchmarkRemoveRecordInPipelinedDML(b *testing.B) {
|
||||
tb, err := dom.InfoSchema().TableByName(context.Background(), ast.NewCIStr("test"), ast.NewCIStr("t"))
|
||||
require.NoError(b, err)
|
||||
|
||||
vardef.EnableMDL.Store(true)
|
||||
vardef.SetEnableMDL(true)
|
||||
|
||||
// Pre-create and add initial records
|
||||
records := make([][]types.Datum, batchSize)
|
||||
|
||||
Reference in New Issue
Block a user