diff --git a/executor/index_lookup_merge_join.go b/executor/index_lookup_merge_join.go index fabc5a9b34..964dd39de0 100644 --- a/executor/index_lookup_merge_join.go +++ b/executor/index_lookup_merge_join.go @@ -189,6 +189,12 @@ func (e *IndexLookUpMergeJoin) startWorkers(ctx context.Context) { // TODO: consider another session currency variable for index merge join. // Because its parallelization is not complete. concurrency := e.ctx.GetSessionVars().IndexLookupJoinConcurrency() + if e.runtimeStats != nil { + runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} + runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) + } + resultCh := make(chan *lookUpMergeJoinTask, concurrency) e.resultCh = resultCh e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency) @@ -741,11 +747,5 @@ func (e *IndexLookUpMergeJoin) Close() error { // cancelFunc control the outer worker and outer worker close the task channel. e.workerWg.Wait() e.memTracker = nil - if e.runtimeStats != nil { - concurrency := cap(e.resultCh) - runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{} - runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", concurrency)) - e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats) - } return e.baseExecutor.Close() } diff --git a/executor/join_test.go b/executor/join_test.go index 3492b5a4a3..b8c38331bd 100644 --- a/executor/join_test.go +++ b/executor/join_test.go @@ -2321,6 +2321,11 @@ func (s *testSuiteJoinSerial) TestExplainAnalyzeJoin(c *C) { c.Assert(len(rows), Equals, 7) c.Assert(rows[0][0], Matches, "HashJoin.*") c.Assert(rows[0][5], Matches, "time:.*, loops:.*, build_hash_table:{total:.*, fetch:.*, build:.*}, probe:{concurrency:5, total:.*, max:.*, probe:.*, fetch:.*}") + // Test for index merge join. + rows = tk.MustQuery("explain analyze select /*+ INL_MERGE_JOIN(t1, t2) */ * from t1,t2 where t1.a=t2.a;").Rows() + c.Assert(len(rows), Equals, 9) + c.Assert(rows[0][0], Matches, "IndexMergeJoin_.*") + c.Assert(rows[0][5], Matches, fmt.Sprintf(".*Concurrency:%v.*", tk.Se.GetSessionVars().IndexLookupJoinConcurrency())) } func (s *testSuiteJoinSerial) TestIssue20270(c *C) {