executor: fix issue of runtime stats of index merge join is wrong (#20892)

Signed-off-by: crazycs520 <crazycs520@gmail.com>
This commit is contained in:
crazycs
2020-11-09 23:27:54 +08:00
committed by GitHub
parent 8bcbdf0aba
commit f140db6fce
2 changed files with 11 additions and 6 deletions

View File

@ -189,6 +189,12 @@ func (e *IndexLookUpMergeJoin) startWorkers(ctx context.Context) {
// TODO: consider another session currency variable for index merge join.
// Because its parallelization is not complete.
concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency()
if e.runtimeStats != nil {
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
resultCh := make(chan *lookUpMergeJoinTask, concurrency)
e.resultCh = resultCh
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
@ -741,11 +747,5 @@ func (e *IndexLookUpMergeJoin) Close() error {
// cancelFunc control the outer worker and outer worker close the task channel.
e.workerWg.Wait()
e.memTracker = nil
if e.runtimeStats != nil {
concurrency := cap(e.resultCh)
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency))
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return e.baseExecutor.Close()
}

View File

@ -2321,6 +2321,11 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) {
c.Assert(len(rows), Equals, 7)
c.Assert(rows[0][0], Matches, "HashJoin.*")
c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}")
// Test for index merge join.
rows = tk.MustQuery("explain analyze select /*+ INL_MERGE_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows()
c.Assert(len(rows), Equals, 9)
c.Assert(rows[0][0], Matches, "IndexMergeJoin_.*")
c.Assert(rows[0][5], Matches, fmt.Sprintf(".*Concurrency:%v.*", tk.Se.GetSessionVars().IndexLookupJoinConcurrency()))
}
func (s *testSuiteJoinSerial) TestIssue20270(c *C) {