executor: fix unstable test in the TestPointGetLockExistKey (#32949)
close pingcap/tidb#32948
This commit is contained in:
@ -18,7 +18,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -32,6 +31,7 @@ import (
|
||||
"github.com/pingcap/tidb/testkit"
|
||||
"github.com/pingcap/tidb/testkit/testdata"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/codec"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/tikv"
|
||||
@ -682,48 +682,46 @@ func TestPointGetWriteLock(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestPointGetLockExistKey(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
errCh := make(chan error)
|
||||
var wg util.WaitGroupWrapper
|
||||
|
||||
testLock := func(rc bool, key string, tableName string) {
|
||||
doneCh := make(chan struct{}, 1)
|
||||
store, clean := testkit.CreateMockStore(t)
|
||||
defer clean()
|
||||
tk1, tk2 := testkit.NewTestKit(t, store), testkit.NewTestKit(t, store)
|
||||
|
||||
errCh <- tk1.ExecToErr("use test")
|
||||
errCh <- tk2.ExecToErr("use test")
|
||||
tk1.MustExec("use test")
|
||||
tk2.MustExec("use test")
|
||||
tk1.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly
|
||||
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("drop table if exists %s", tableName))
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key))
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 1)", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("drop table if exists %s", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key))
|
||||
tk1.MustExec(fmt.Sprintf("insert into %s values(1, 1, 1)", tableName))
|
||||
|
||||
if rc {
|
||||
errCh <- tk1.ExecToErr("set tx_isolation = 'READ-COMMITTED'")
|
||||
errCh <- tk2.ExecToErr("set tx_isolation = 'READ-COMMITTED'")
|
||||
tk1.MustExec("set tx_isolation = 'READ-COMMITTED'")
|
||||
tk2.MustExec("set tx_isolation = 'READ-COMMITTED'")
|
||||
}
|
||||
|
||||
// select for update
|
||||
errCh <- tk1.ExecToErr("begin pessimistic")
|
||||
errCh <- tk2.ExecToErr("begin pessimistic")
|
||||
tk1.MustExec("begin pessimistic")
|
||||
tk2.MustExec("begin pessimistic")
|
||||
// lock exist key
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where id = 1 and v = 1 for update", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("select * from %s where id = 1 and v = 1 for update", tableName))
|
||||
// read committed will not lock non-exist key
|
||||
if rc {
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where id = 2 and v = 2 for update", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("select * from %s where id = 2 and v = 2 for update", tableName))
|
||||
}
|
||||
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(2, 2, 2)", tableName))
|
||||
go func() {
|
||||
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
|
||||
tk2.MustExec(fmt.Sprintf("insert into %s values(2, 2, 2)", tableName))
|
||||
var wg3 util.WaitGroupWrapper
|
||||
wg3.Run(func() {
|
||||
tk2.MustExec(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
|
||||
// tk2.MustExec(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName))
|
||||
doneCh <- struct{}{}
|
||||
}()
|
||||
})
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName))
|
||||
errCh <- tk1.ExecToErr("commit")
|
||||
<-doneCh
|
||||
errCh <- tk2.ExecToErr("commit")
|
||||
tk1.MustExec(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName))
|
||||
tk1.MustExec("commit")
|
||||
wg3.Wait()
|
||||
tk2.MustExec("commit")
|
||||
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
|
||||
"1 2 1",
|
||||
"2 2 2",
|
||||
@ -731,23 +729,23 @@ func TestPointGetLockExistKey(t *testing.T) {
|
||||
))
|
||||
|
||||
// update
|
||||
errCh <- tk1.ExecToErr("begin pessimistic")
|
||||
errCh <- tk2.ExecToErr("begin pessimistic")
|
||||
tk1.MustExec("begin pessimistic")
|
||||
tk2.MustExec("begin pessimistic")
|
||||
// lock exist key
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 3 where id = 2 and v = 2", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("update %s set v = 3 where id = 2 and v = 2", tableName))
|
||||
// read committed will not lock non-exist key
|
||||
if rc {
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v =4 where id = 3 and v = 3", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("update %s set v =4 where id = 3 and v = 3", tableName))
|
||||
}
|
||||
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName))
|
||||
go func() {
|
||||
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(2, 2, 20)", tableName))
|
||||
doneCh <- struct{}{}
|
||||
}()
|
||||
tk2.MustExec(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName))
|
||||
var wg2 util.WaitGroupWrapper
|
||||
wg2.Run(func() {
|
||||
tk2.MustExec(fmt.Sprintf("insert into %s values(2, 2, 20)", tableName))
|
||||
})
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
errCh <- tk1.ExecToErr("commit")
|
||||
<-doneCh
|
||||
errCh <- tk2.ExecToErr("commit")
|
||||
tk1.MustExec("commit")
|
||||
wg2.Wait()
|
||||
tk2.MustExec("commit")
|
||||
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
|
||||
"1 2 1",
|
||||
"2 3 2",
|
||||
@ -757,23 +755,23 @@ func TestPointGetLockExistKey(t *testing.T) {
|
||||
))
|
||||
|
||||
// delete
|
||||
errCh <- tk1.ExecToErr("begin pessimistic")
|
||||
errCh <- tk2.ExecToErr("begin pessimistic")
|
||||
tk1.MustExec("begin pessimistic")
|
||||
tk2.MustExec("begin pessimistic")
|
||||
// lock exist key
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where id = 3 and v = 3", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("delete from %s where id = 3 and v = 3", tableName))
|
||||
// read committed will not lock non-exist key
|
||||
if rc {
|
||||
errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where id = 4 and v = 4", tableName))
|
||||
tk1.MustExec(fmt.Sprintf("delete from %s where id = 4 and v = 4", tableName))
|
||||
}
|
||||
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName))
|
||||
go func() {
|
||||
errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName))
|
||||
doneCh <- struct{}{}
|
||||
}()
|
||||
tk2.MustExec(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName))
|
||||
var wg1 util.WaitGroupWrapper
|
||||
wg1.Run(func() {
|
||||
tk2.MustExec(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName))
|
||||
})
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
errCh <- tk1.ExecToErr("commit")
|
||||
<-doneCh
|
||||
errCh <- tk2.ExecToErr("commit")
|
||||
tk1.MustExec("commit")
|
||||
wg1.Wait()
|
||||
tk2.MustExec("commit")
|
||||
tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows(
|
||||
"1 2 1",
|
||||
"2 3 2",
|
||||
@ -782,7 +780,6 @@ func TestPointGetLockExistKey(t *testing.T) {
|
||||
"4 4 4",
|
||||
"3 3 30",
|
||||
))
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
for i, one := range []struct {
|
||||
@ -794,18 +791,12 @@ func TestPointGetLockExistKey(t *testing.T) {
|
||||
{rc: true, key: "primary key"},
|
||||
{rc: true, key: "unique key"},
|
||||
} {
|
||||
wg.Add(1)
|
||||
tableName := fmt.Sprintf("t_%d", i)
|
||||
go testLock(one.rc, one.key, tableName)
|
||||
}
|
||||
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
}()
|
||||
for err := range errCh {
|
||||
require.NoError(t, err)
|
||||
wg.Run(func() {
|
||||
testLock(one.rc, one.key, tableName)
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestWithTiDBSnapshot(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user