Files
tidb/pkg/ddl/table_modify_test.go

186 lines
6.9 KiB
Go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl_test
import (
"context"
"testing"
"time"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/session/sessionapi"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
)
const tableModifyLease = 600 * time.Millisecond
func TestLockTableReadOnly(t *testing.T) {
store := testkit.CreateMockStoreWithSchemaLease(t, tableModifyLease)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("drop table if exists t1,t2")
defer func() {
tk1.MustExec("alter table t1 read write")
tk1.MustExec("alter table t2 read write")
tk1.MustExec("drop table if exists t1,t2")
}()
tk1.MustExec("create table t1 (a int key, b int)")
tk1.MustExec("create table t2 (a int key)")
tk1.MustExec("alter table t1 read only")
tk1.MustQuery("select * from t1")
tk2.MustQuery("select * from t1")
require.True(t, terror.ErrorEqual(tk1.ExecToErr("insert into t1 set a=1, b=2"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk1.ExecToErr("update t1 set a=1"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk1.ExecToErr("delete from t1"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("insert into t1 set a=1, b=2"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("update t1 set a=1"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("delete from t1"), infoschema.ErrTableLocked))
tk2.MustExec("alter table t1 read only")
require.True(t, terror.ErrorEqual(tk2.ExecToErr("insert into t1 set a=1, b=2"), infoschema.ErrTableLocked))
tk1.MustExec("alter table t1 read write")
tk1.MustExec("lock tables t1 read")
require.True(t, terror.ErrorEqual(tk1.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked))
tk1.MustExec("lock tables t1 write")
require.True(t, terror.ErrorEqual(tk1.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked))
tk1.MustExec("lock tables t1 write local")
require.True(t, terror.ErrorEqual(tk1.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("alter table t1 read only"), infoschema.ErrTableLocked))
tk1.MustExec("unlock tables")
tk1.MustExec("alter table t1 read only")
require.True(t, terror.ErrorEqual(tk1.ExecToErr("lock tables t1 read"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("lock tables t1 read"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk1.ExecToErr("lock tables t1 write"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("lock tables t1 write"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk1.ExecToErr("lock tables t1 write local"), infoschema.ErrTableLocked))
require.True(t, terror.ErrorEqual(tk2.ExecToErr("lock tables t1 write local"), infoschema.ErrTableLocked))
tk1.MustExec("admin cleanup table lock t1")
tk2.MustExec("insert into t1 set a=1, b=2")
}
// TestConcurrentLockTables test concurrent lock/unlock tables.
func TestConcurrentLockTables(t *testing.T) {
store := testkit.CreateMockStoreWithSchemaLease(t, tableModifyLease)
tk1 := testkit.NewTestKit(t, store)
tk2 := testkit.NewTestKit(t, store)
tk1.MustExec("use test")
tk2.MustExec("use test")
tk1.MustExec("create table t1 (a int)")
// Test concurrent lock tables read.
sql1 := "lock tables t1 read"
sql2 := "lock tables t1 read"
testParallelExecSQL(t, store, sql1, sql2, tk1.Session(), tk2.Session(), func(t *testing.T, err1, err2 error) {
require.NoError(t, err1)
require.NoError(t, err2)
})
tk1.MustExec("unlock tables")
tk2.MustExec("unlock tables")
// Test concurrent lock tables write.
sql1 = "lock tables t1 write"
sql2 = "lock tables t1 write"
testParallelExecSQL(t, store, sql1, sql2, tk1.Session(), tk2.Session(), func(t *testing.T, err1, err2 error) {
require.NoError(t, err1)
require.True(t, terror.ErrorEqual(err2, infoschema.ErrTableLocked))
})
tk1.MustExec("unlock tables")
tk2.MustExec("unlock tables")
// Test concurrent lock tables write local.
sql1 = "lock tables t1 write local"
sql2 = "lock tables t1 write local"
testParallelExecSQL(t, store, sql1, sql2, tk1.Session(), tk2.Session(), func(t *testing.T, err1, err2 error) {
require.NoError(t, err1)
require.True(t, terror.ErrorEqual(err2, infoschema.ErrTableLocked))
})
tk1.MustExec("unlock tables")
tk2.MustExec("unlock tables")
}
func testParallelExecSQL(t *testing.T, store kv.Storage, sql1, sql2 string, se1, se2 sessionapi.Session, f func(t *testing.T, err1, err2 error)) {
times := 0
ctx := context.Background()
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
if times != 0 {
return
}
var qLen int
for {
sess := testkit.NewTestKit(t, store).Session()
err := sessiontxn.NewTxn(ctx, sess)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(ctx, sess)
require.NoError(t, err)
qLen = len(jobs)
if qLen == 2 {
break
}
time.Sleep(5 * time.Millisecond)
}
times++
})
defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep")
var wg util.WaitGroupWrapper
var err1 error
var err2 error
ch := make(chan struct{})
// Make sure the sql1 is put into the DDLJobQueue.
go func() {
var qLen int
for {
sess := testkit.NewTestKit(t, store).Session()
err := sessiontxn.NewTxn(ctx, sess)
require.NoError(t, err)
jobs, err := ddl.GetAllDDLJobs(ctx, sess)
require.NoError(t, err)
qLen = len(jobs)
if qLen == 1 {
// Make sure sql2 is executed after the sql1.
close(ch)
break
}
time.Sleep(5 * time.Millisecond)
}
}()
wg.Run(func() {
_, err1 = se1.Execute(context.Background(), sql1)
})
wg.Run(func() {
<-ch
_, err2 = se2.Execute(context.Background(), sql2)
})
wg.Wait()
f(t, err1, err2)
}