From b88d62ecbbad8c4eef6e4aa31a67d7084577efa3 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Wed, 9 Mar 2022 18:27:50 +0800 Subject: [PATCH] executor: fix unstable test in the TestPointGetLockExistKey (#32949) close pingcap/tidb#32948 --- executor/point_get_test.go | 109 +++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 59 deletions(-) diff --git a/executor/point_get_test.go b/executor/point_get_test.go index ad412010f8..64f50e12df 100644 --- a/executor/point_get_test.go +++ b/executor/point_get_test.go @@ -18,7 +18,6 @@ import ( "context" "fmt" "strings" - "sync" "testing" "time" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/testkit/testdata" "github.com/pingcap/tidb/types" + "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/tikv" @@ -682,48 +682,46 @@ func TestPointGetWriteLock(t *testing.T) { } func TestPointGetLockExistKey(t *testing.T) { - var wg sync.WaitGroup - errCh := make(chan error) + var wg util.WaitGroupWrapper testLock := func(rc bool, key string, tableName string) { - doneCh := make(chan struct{}, 1) store, clean := testkit.CreateMockStore(t) defer clean() tk1, tk2 := testkit.NewTestKit(t, store), testkit.NewTestKit(t, store) - errCh <- tk1.ExecToErr("use test") - errCh <- tk2.ExecToErr("use test") + tk1.MustExec("use test") + tk2.MustExec("use test") tk1.Session().GetSessionVars().EnableClusteredIndex = variable.ClusteredIndexDefModeIntOnly - errCh <- tk1.ExecToErr(fmt.Sprintf("drop table if exists %s", tableName)) - errCh <- tk1.ExecToErr(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key)) - errCh <- tk1.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 1)", tableName)) + tk1.MustExec(fmt.Sprintf("drop table if exists %s", tableName)) + tk1.MustExec(fmt.Sprintf("create table %s(id int, v int, k int, %s key0(id, v))", tableName, key)) + tk1.MustExec(fmt.Sprintf("insert into %s values(1, 1, 1)", tableName)) if rc { - errCh <- tk1.ExecToErr("set tx_isolation = 'READ-COMMITTED'") - errCh <- tk2.ExecToErr("set tx_isolation = 'READ-COMMITTED'") + tk1.MustExec("set tx_isolation = 'READ-COMMITTED'") + tk2.MustExec("set tx_isolation = 'READ-COMMITTED'") } // select for update - errCh <- tk1.ExecToErr("begin pessimistic") - errCh <- tk2.ExecToErr("begin pessimistic") + tk1.MustExec("begin pessimistic") + tk2.MustExec("begin pessimistic") // lock exist key - errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where id = 1 and v = 1 for update", tableName)) + tk1.MustExec(fmt.Sprintf("select * from %s where id = 1 and v = 1 for update", tableName)) // read committed will not lock non-exist key if rc { - errCh <- tk1.ExecToErr(fmt.Sprintf("select * from %s where id = 2 and v = 2 for update", tableName)) + tk1.MustExec(fmt.Sprintf("select * from %s where id = 2 and v = 2 for update", tableName)) } - errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(2, 2, 2)", tableName)) - go func() { - errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName)) + tk2.MustExec(fmt.Sprintf("insert into %s values(2, 2, 2)", tableName)) + var wg3 util.WaitGroupWrapper + wg3.Run(func() { + tk2.MustExec(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName)) // tk2.MustExec(fmt.Sprintf("insert into %s values(1, 1, 10)", tableName)) - doneCh <- struct{}{} - }() + }) time.Sleep(150 * time.Millisecond) - errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName)) - errCh <- tk1.ExecToErr("commit") - <-doneCh - errCh <- tk2.ExecToErr("commit") + tk1.MustExec(fmt.Sprintf("update %s set v = 2 where id = 1 and v = 1", tableName)) + tk1.MustExec("commit") + wg3.Wait() + tk2.MustExec("commit") tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows( "1 2 1", "2 2 2", @@ -731,23 +729,23 @@ func TestPointGetLockExistKey(t *testing.T) { )) // update - errCh <- tk1.ExecToErr("begin pessimistic") - errCh <- tk2.ExecToErr("begin pessimistic") + tk1.MustExec("begin pessimistic") + tk2.MustExec("begin pessimistic") // lock exist key - errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v = 3 where id = 2 and v = 2", tableName)) + tk1.MustExec(fmt.Sprintf("update %s set v = 3 where id = 2 and v = 2", tableName)) // read committed will not lock non-exist key if rc { - errCh <- tk1.ExecToErr(fmt.Sprintf("update %s set v =4 where id = 3 and v = 3", tableName)) + tk1.MustExec(fmt.Sprintf("update %s set v =4 where id = 3 and v = 3", tableName)) } - errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName)) - go func() { - errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(2, 2, 20)", tableName)) - doneCh <- struct{}{} - }() + tk2.MustExec(fmt.Sprintf("insert into %s values(3, 3, 3)", tableName)) + var wg2 util.WaitGroupWrapper + wg2.Run(func() { + tk2.MustExec(fmt.Sprintf("insert into %s values(2, 2, 20)", tableName)) + }) time.Sleep(150 * time.Millisecond) - errCh <- tk1.ExecToErr("commit") - <-doneCh - errCh <- tk2.ExecToErr("commit") + tk1.MustExec("commit") + wg2.Wait() + tk2.MustExec("commit") tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows( "1 2 1", "2 3 2", @@ -757,23 +755,23 @@ func TestPointGetLockExistKey(t *testing.T) { )) // delete - errCh <- tk1.ExecToErr("begin pessimistic") - errCh <- tk2.ExecToErr("begin pessimistic") + tk1.MustExec("begin pessimistic") + tk2.MustExec("begin pessimistic") // lock exist key - errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where id = 3 and v = 3", tableName)) + tk1.MustExec(fmt.Sprintf("delete from %s where id = 3 and v = 3", tableName)) // read committed will not lock non-exist key if rc { - errCh <- tk1.ExecToErr(fmt.Sprintf("delete from %s where id = 4 and v = 4", tableName)) + tk1.MustExec(fmt.Sprintf("delete from %s where id = 4 and v = 4", tableName)) } - errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName)) - go func() { - errCh <- tk2.ExecToErr(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName)) - doneCh <- struct{}{} - }() + tk2.MustExec(fmt.Sprintf("insert into %s values(4, 4, 4)", tableName)) + var wg1 util.WaitGroupWrapper + wg1.Run(func() { + tk2.MustExec(fmt.Sprintf("insert into %s values(3, 3, 30)", tableName)) + }) time.Sleep(50 * time.Millisecond) - errCh <- tk1.ExecToErr("commit") - <-doneCh - errCh <- tk2.ExecToErr("commit") + tk1.MustExec("commit") + wg1.Wait() + tk2.MustExec("commit") tk1.MustQuery(fmt.Sprintf("select * from %s", tableName)).Check(testkit.Rows( "1 2 1", "2 3 2", @@ -782,7 +780,6 @@ func TestPointGetLockExistKey(t *testing.T) { "4 4 4", "3 3 30", )) - wg.Done() } for i, one := range []struct { @@ -794,18 +791,12 @@ func TestPointGetLockExistKey(t *testing.T) { {rc: true, key: "primary key"}, {rc: true, key: "unique key"}, } { - wg.Add(1) tableName := fmt.Sprintf("t_%d", i) - go testLock(one.rc, one.key, tableName) - } - - go func() { - wg.Wait() - close(errCh) - }() - for err := range errCh { - require.NoError(t, err) + wg.Run(func() { + testLock(one.rc, one.key, tableName) + }) } + wg.Wait() } func TestWithTiDBSnapshot(t *testing.T) {