store: add lock for runtime stats to fix panic caused by concurrent execution (#18983)
This commit is contained in:
@ -54,6 +54,15 @@ func (s *testSuiteJoin1) TestJoinPanic(c *C) {
|
||||
tk.MustQuery("SELECT * FROM events e JOIN (SELECT MAX(clock) AS clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock")
|
||||
err := tk.ExecToErr("SELECT * FROM events e JOIN (SELECT clock FROM events e2 GROUP BY e2.source) e3 ON e3.clock=e.clock")
|
||||
c.Check(err, NotNil)
|
||||
|
||||
// Test for PR 18983, use to detect race.
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists tpj1,tpj2;")
|
||||
tk.MustExec("create table tpj1 (id int, b int, unique index (id));")
|
||||
tk.MustExec("create table tpj2 (id int, b int, unique index (id));")
|
||||
tk.MustExec("insert into tpj1 values (1,1);")
|
||||
tk.MustExec("insert into tpj2 values (1,1);")
|
||||
tk.MustQuery("select tpj1.b,tpj2.b from tpj1 left join tpj2 on tpj1.id=tpj2.id where tpj1.id=1;").Check(testkit.Rows("1 1"))
|
||||
}
|
||||
|
||||
func (s *testSuite) TestJoinInDisk(c *C) {
|
||||
|
||||
@ -74,8 +74,8 @@ type tikvSnapshot struct {
|
||||
sync.RWMutex
|
||||
hitCnt int64
|
||||
cached map[string][]byte
|
||||
stats *SnapshotRuntimeStats
|
||||
}
|
||||
stats *SnapshotRuntimeStats
|
||||
}
|
||||
|
||||
// newTiKVSnapshot creates a snapshot of an TiKV store.
|
||||
@ -238,7 +238,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
|
||||
minCommitTSPushed: &s.minCommitTSPushed,
|
||||
Client: s.store.client,
|
||||
}
|
||||
if s.stats != nil {
|
||||
if s.mu.stats != nil {
|
||||
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
|
||||
defer func() {
|
||||
s.mergeRegionRequestStats(cli.stats)
|
||||
@ -367,7 +367,7 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) {
|
||||
Client: s.store.client,
|
||||
resolveLite: true,
|
||||
}
|
||||
if s.stats != nil {
|
||||
if s.mu.stats != nil {
|
||||
cli.stats = make(map[tikvrpc.CmdType]*RegionRequestRuntimeStats)
|
||||
defer func() {
|
||||
s.mergeRegionRequestStats(cli.stats)
|
||||
@ -452,7 +452,9 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
|
||||
case kv.TaskID:
|
||||
s.taskID = val.(uint64)
|
||||
case kv.CollectRuntimeStats:
|
||||
s.stats = val.(*SnapshotRuntimeStats)
|
||||
s.mu.Lock()
|
||||
s.mu.stats = val.(*SnapshotRuntimeStats)
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@ -462,7 +464,9 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
|
||||
case kv.ReplicaRead:
|
||||
s.replicaRead = kv.ReplicaReadLeader
|
||||
case kv.CollectRuntimeStats:
|
||||
s.stats = nil
|
||||
s.mu.Lock()
|
||||
s.mu.stats = nil
|
||||
s.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
@ -579,35 +583,41 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
|
||||
}
|
||||
|
||||
func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
|
||||
if s.stats == nil || bo.totalSleep == 0 {
|
||||
if s.mu.stats == nil || bo.totalSleep == 0 {
|
||||
return
|
||||
}
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.stats.backoffSleepMS == nil {
|
||||
s.stats.backoffSleepMS = bo.backoffSleepMS
|
||||
s.stats.backoffTimes = bo.backoffTimes
|
||||
if s.mu.stats == nil {
|
||||
return
|
||||
}
|
||||
if s.mu.stats.backoffSleepMS == nil {
|
||||
s.mu.stats.backoffSleepMS = bo.backoffSleepMS
|
||||
s.mu.stats.backoffTimes = bo.backoffTimes
|
||||
return
|
||||
}
|
||||
for k, v := range bo.backoffSleepMS {
|
||||
s.stats.backoffSleepMS[k] += v
|
||||
s.mu.stats.backoffSleepMS[k] += v
|
||||
}
|
||||
for k, v := range bo.backoffTimes {
|
||||
s.stats.backoffTimes[k] += v
|
||||
s.mu.stats.backoffTimes[k] += v
|
||||
}
|
||||
}
|
||||
|
||||
func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RegionRequestRuntimeStats) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
if s.stats.rpcStats == nil {
|
||||
s.stats.rpcStats = stats
|
||||
if s.mu.stats == nil {
|
||||
return
|
||||
}
|
||||
if s.mu.stats.rpcStats == nil {
|
||||
s.mu.stats.rpcStats = stats
|
||||
return
|
||||
}
|
||||
for k, v := range stats {
|
||||
stat, ok := s.stats.rpcStats[k]
|
||||
stat, ok := s.mu.stats.rpcStats[k]
|
||||
if !ok {
|
||||
s.stats.rpcStats[k] = v
|
||||
s.mu.stats.rpcStats[k] = v
|
||||
continue
|
||||
}
|
||||
stat.count += v.count
|
||||
|
||||
@ -316,5 +316,5 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) {
|
||||
snapshot.recordBackoffInfo(bo)
|
||||
snapshot.recordBackoffInfo(bo)
|
||||
expect := "Get:{num_rpc:4, total_time:2.002s},txnLockFast_backoff:{num:2, total_time:60 ms}"
|
||||
c.Assert(snapshot.stats.String(), Equals, expect)
|
||||
c.Assert(snapshot.mu.stats.String(), Equals, expect)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user