diff --git a/executor/join_test.go b/executor/join_test.go index 05bc552575..e71568fbc1 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -54,6 +54,15 @@ func (s *testSuiteJoin1) TestJoinPanic(c *C) { tk.MustQuery("SELECT * FROM events e JOIN (SELECT MAX(clock) AS clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock") err := tk.ExecToErr("SELECT * FROM events e JOIN (SELECT clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock") c.Check(err, NotNil) + + // Test for PR 18983, use to detect race. + tk.MustExec("use test") + tk.MustExec("drop table if exists tpj1,tpj2;") + tk.MustExec("create table tpj1 (id int, b int, unique index (id));") + tk.MustExec("create table tpj2 (id int, b int, unique index (id));") + tk.MustExec("insert into tpj1 values (1,1);") + tk.MustExec("insert into tpj2 values (1,1);") + tk.MustQuery("select tpj1.b,tpj2.b from tpj1 left join tpj2 on tpj1.id=tpj2.id where tpj1.id=1;").Check(testkit.Rows("1 1")) } func (s *testSuite) TestJoinInDisk(c *C) { diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index d576de9cc3..1756e509d6 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -74,8 +74,8 @@ type tikvSnapshot struct { sync.RWMutex hitCnt int64 cached map[string][]byte + stats *SnapshotRuntimeStats } - stats *SnapshotRuntimeStats } // newTiKVSnapshot creates a snapshot of an TiKV store. @@ -238,7 +238,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll minCommitTSPushed: &s.minCommitTSPushed, Client: s.store.client, } - if s.stats != nil { + if s.mu.stats != nil { cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) defer func() { s.mergeRegionRequestStats(cli.stats) @@ -367,7 +367,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { Client: s.store.client, resolveLite: true, } - if s.stats != nil { + if s.mu.stats != nil { cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats) defer func() { s.mergeRegionRequestStats(cli.stats) @@ -452,7 +452,9 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { case kv.TaskID: s.taskID = val.(uint64) case kv.CollectRuntimeStats: - s.stats = val.(*SnapshotRuntimeStats) + s.mu.Lock() + s.mu.stats = val.(*SnapshotRuntimeStats) + s.mu.Unlock() } } @@ -462,7 +464,9 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) { case kv.ReplicaRead: s.replicaRead = kv.ReplicaReadLeader case kv.CollectRuntimeStats: - s.stats = nil + s.mu.Lock() + s.mu.stats = nil + s.mu.Unlock() } } @@ -579,35 +583,41 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) { } func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) { - if s.stats == nil || bo.totalSleep == 0 { + if s.mu.stats == nil || bo.totalSleep == 0 { return } s.mu.Lock() defer s.mu.Unlock() - if s.stats.backoffSleepMS == nil { - s.stats.backoffSleepMS = bo.backoffSleepMS - s.stats.backoffTimes = bo.backoffTimes + if s.mu.stats == nil { + return + } + if s.mu.stats.backoffSleepMS == nil { + s.mu.stats.backoffSleepMS = bo.backoffSleepMS + s.mu.stats.backoffTimes = bo.backoffTimes return } for k, v := range bo.backoffSleepMS { - s.stats.backoffSleepMS[k] += v + s.mu.stats.backoffSleepMS[k] += v } for k, v := range bo.backoffTimes { - s.stats.backoffTimes[k] += v + s.mu.stats.backoffTimes[k] += v } } func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) { s.mu.Lock() defer s.mu.Unlock() - if s.stats.rpcStats == nil { - s.stats.rpcStats = stats + if s.mu.stats == nil { + return + } + if s.mu.stats.rpcStats == nil { + s.mu.stats.rpcStats = stats return } for k, v := range stats { - stat, ok := s.stats.rpcStats[k] + stat, ok := s.mu.stats.rpcStats[k] if !ok { - s.stats.rpcStats[k] = v + s.mu.stats.rpcStats[k] = v continue } stat.count += v.count diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 2d82765bec..ce1a864c54 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -316,5 +316,5 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { snapshot.recordBackoffInfo(bo) snapshot.recordBackoffInfo(bo) expect := "Get:{num_rpc:4, total_time:2.002s},txnLockFast_backoff:{num:2, total_time:60 ms}" - c.Assert(snapshot.stats.String(), Equals, expect) + c.Assert(snapshot.mu.stats.String(), Equals, expect) }