executor: show operators' memory consumption in results of EXPLAIN ANALYZE (#11334)

This commit is contained in:
Yuanjia Zhang
2019-07-24 10:53:02 +08:00
committed by GitHub
parent 18724b950b
commit 1ad073bf80
12 changed files with 103 additions and 45 deletions

View File

@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)
@ -42,7 +43,8 @@ func (s *testSuite) createSelectNormal(batch, totalRows int, c *C, planIDs []str
SetDesc(false).
SetKeepOrder(false).
SetFromSessionVars(variable.NewSessionVars()).
SetMemTracker(s.sctx, stringutil.StringerStr("testSuite.createSelectNormal")).
SetMemTracker(memory.NewTracker(stringutil.StringerStr("testSuite.createSelectNormal"),
s.sctx.GetSessionVars().MemQuotaDistSQL)).
Build()
c.Assert(err, IsNil)

View File

@ -14,12 +14,10 @@
package distsql
import (
"fmt"
"math"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
@ -44,10 +42,8 @@ func (builder *RequestBuilder) Build() (*kv.Request, error) {
}
// SetMemTracker sets a memTracker for this request.
func (builder *RequestBuilder) SetMemTracker(sctx sessionctx.Context, label fmt.Stringer) *RequestBuilder {
t := memory.NewTracker(label, sctx.GetSessionVars().MemQuotaDistSQL)
t.AttachTo(sctx.GetSessionVars().StmtCtx.MemTracker)
builder.Request.MemTracker = t
func (builder *RequestBuilder) SetMemTracker(tracker *memory.Tracker) *RequestBuilder {
builder.Request.MemTracker = tracker
return builder
}

View File

@ -19,6 +19,7 @@ import (
"math"
"runtime"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
@ -220,6 +221,8 @@ type IndexReaderExecutor struct {
colLens []int
plans []plannercore.PhysicalPlan
memTracker *memory.Tracker
selectResultHook // for testing
}
@ -261,8 +264,6 @@ func (e *IndexReaderExecutor) Open(ctx context.Context) error {
return e.open(ctx, kvRanges)
}
var indexReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("IndexReaderDistSQLTracker")
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
@ -277,6 +278,8 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
e.dagPB.CollectExecutionSummaries = &collExec
}
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
@ -284,7 +287,7 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, indexReaderDistSQLTrackerLabel).
SetMemTracker(e.memTracker).
Build()
if err != nil {
e.feedback.Invalidate()
@ -415,6 +418,8 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
e.dagPB.CollectExecutionSummaries = &collExec
}
tracker := memory.NewTracker(stringutil.StringerStr("IndexWorker"), e.ctx.GetSessionVars().MemQuotaIndexLookupReader)
tracker.AttachTo(e.memTracker)
var builder distsql.RequestBuilder
kvReq, err := builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
@ -422,7 +427,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
SetKeepOrder(e.keepOrder).
SetStreaming(e.indexStreaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, indexLookupDistSQLTrackerLabel).
SetMemTracker(tracker).
Build()
if err != nil {
return err
@ -471,8 +476,6 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, kvRanges []k
return nil
}
var tableWorkerLabel fmt.Stringer = stringutil.StringerStr("tableWorker")
// startTableWorker launchs some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-chan *lookupTableTask) {
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
@ -486,7 +489,8 @@ func (e *IndexLookUpExecutor) startTableWorker(ctx context.Context, workCh <-cha
keepOrder: e.keepOrder,
handleIdx: e.handleIdx,
isCheckOp: e.isCheckOp,
memTracker: memory.NewTracker(tableWorkerLabel, -1),
memTracker: memory.NewTracker(stringutil.MemoizeStr(func() string { return "TableWorker_" + strconv.Itoa(i) }),
e.ctx.GetSessionVars().MemQuotaIndexLookupReader),
}
worker.memTracker.AttachTo(e.memTracker)
ctx1, cancel := context.WithCancel(ctx)
@ -531,7 +535,6 @@ func (e *IndexLookUpExecutor) Close() error {
e.tblWorkerWg.Wait()
e.finished = nil
e.workerStarted = false
e.memTracker.Detach()
e.memTracker = nil
if e.runtimeStats != nil {
copStats := e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(e.idxPlans[0].ExplainID().String())

View File

@ -14,6 +14,8 @@
package executor_test
import (
"strings"
. "github.com/pingcap/check"
"github.com/pingcap/parser/auth"
plannercore "github.com/pingcap/tidb/planner/core"
@ -74,3 +76,50 @@ func (s *testSuite1) TestExplainWrite(c *C) {
tk.MustExec("explain analyze insert into t select 1")
tk.MustQuery("select * from t order by a").Check(testkit.Rows("1", "2"))
}
func (s *testSuite1) TestExplainAnalyzeMemory(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (v int, k int, key(k))")
tk.MustExec("insert into t values (1, 1), (1, 1), (1, 1), (1, 1), (1, 1)")
s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v")
s.checkMemoryInfo(c, tk, "explain analyze select * from t order by v limit 5")
s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_HJ(t1, t2) */ t1.k from t t1, t t2 where t1.v = t2.v+1")
s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_SMJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k+1")
s.checkMemoryInfo(c, tk, "explain analyze select /*+ TIDB_INLJ(t1, t2) */ t1.k from t t1, t t2 where t1.k = t2.k and t1.v=1")
s.checkMemoryInfo(c, tk, "explain analyze select sum(k) from t group by v")
s.checkMemoryInfo(c, tk, "explain analyze select sum(v) from t group by k")
s.checkMemoryInfo(c, tk, "explain analyze select * from t")
s.checkMemoryInfo(c, tk, "explain analyze select k from t use index(k)")
s.checkMemoryInfo(c, tk, "explain analyze select * from t use index(k)")
}
func (s *testSuite1) checkMemoryInfo(c *C, tk *testkit.TestKit, sql string) {
memCol := 5
ops := []string{"Join", "Reader", "Top", "Sort", "LookUp"}
rows := tk.MustQuery(sql).Rows()
for _, row := range rows {
strs := make([]string, len(row))
for i, c := range row {
strs[i] = c.(string)
}
if strings.Contains(strs[2], "cop") {
continue
}
shouldHasMem := false
for _, op := range ops {
if strings.Contains(strs[0], op) {
shouldHasMem = true
break
}
}
if shouldHasMem {
c.Assert(strs[memCol], Not(Equals), "N/A")
} else {
c.Assert(strs[memCol], Equals, "N/A")
}
}
}

View File

@ -295,9 +295,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
return nil, nil
}
if e.task != nil {
e.task.memTracker.Detach()
}
e.task = task
return task, nil
}
@ -650,7 +647,6 @@ func (e *IndexLookUpJoin) Close() error {
e.cancelFunc()
}
e.workerWg.Wait()
e.memTracker.Detach()
e.memTracker = nil
return e.children[0].Close()
}

View File

@ -134,7 +134,6 @@ func (e *HashJoinExec) Close() error {
e.outerChkResourceCh = nil
e.joinChkResourceCh = nil
}
e.memTracker.Detach()
e.memTracker = nil
err := e.baseExecutor.Close()
@ -633,7 +632,6 @@ type NestedLoopApplyExec struct {
func (e *NestedLoopApplyExec) Close() error {
e.innerRows = nil
e.memTracker.Detach()
e.memTracker = nil
return e.outerExec.Close()
}

View File

@ -200,7 +200,6 @@ func (t *mergeJoinInnerTable) reallocReaderResult() {
// Close implements the Executor Close interface.
func (e *MergeJoinExec) Close() error {
e.memTracker.Detach()
e.childrenResults = nil
e.memTracker = nil

View File

@ -54,7 +54,6 @@ type SortExec struct {
// Close implements the Executor Close interface.
func (e *SortExec) Close() error {
e.memTracker.Detach()
e.memTracker = nil
return e.children[0].Close()
}

View File

@ -26,8 +26,8 @@ import (
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tipb/go-tipb"
)
@ -72,11 +72,16 @@ type TableReaderExecutor struct {
corColInAccess bool
plans []plannercore.PhysicalPlan
memTracker *memory.Tracker
selectResultHook // for testing
}
// Open initialzes necessary variables for using this executor.
func (e *TableReaderExecutor) Open(ctx context.Context) error {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaDistSQL)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
var err error
if e.corColInFilter {
e.dagPB.Executors, _, err = constructDistExec(e.ctx, e.plans)
@ -148,8 +153,6 @@ func (e *TableReaderExecutor) Close() error {
return err
}
var tableReaderDistSQLTrackerLabel fmt.Stringer = stringutil.StringerStr("TableReaderDistSQLTracker")
// buildResp first builds request and sends it to tikv using distsql.Select. It uses SelectResut returned by the callee
// to fetch all results.
func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Range) (distsql.SelectResult, error) {
@ -160,7 +163,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
SetKeepOrder(e.keepOrder).
SetStreaming(e.streaming).
SetFromSessionVars(e.ctx.GetSessionVars()).
SetMemTracker(e.ctx, tableReaderDistSQLTrackerLabel).
SetMemTracker(e.memTracker).
Build()
if err != nil {
return nil, err

View File

@ -80,7 +80,7 @@ func (s *testAnalyzeSuite) TestExplainAnalyze(c *C) {
rs := tk.MustQuery("explain analyze select t1.a, t1.b, sum(t1.c) from t1 join t2 on t1.a = t2.b where t1.a > 1")
c.Assert(len(rs.Rows()), Equals, 10)
for _, row := range rs.Rows() {
c.Assert(len(row), Equals, 5)
c.Assert(len(row), Equals, 6)
execInfo := row[4].(string)
c.Assert(strings.Contains(execInfo, "time"), Equals, true)
c.Assert(strings.Contains(execInfo, "loops"), Equals, true)
@ -977,7 +977,7 @@ func (s *testAnalyzeSuite) TestIssue9805(c *C) {
c.Assert(rs.Rows(), HasLen, 10)
hasIndexLookUp12 := false
for _, row := range rs.Rows() {
c.Assert(row, HasLen, 5)
c.Assert(row, HasLen, 6)
if strings.HasSuffix(row[0].(string), "IndexLookUp_12") {
hasIndexLookUp12 = true
c.Assert(row[4], Equals, "time:0ns, loops:0, rows:0")

View File

@ -563,7 +563,7 @@ func (e *Explain) prepareSchema() error {
case ast.ExplainFormatROW:
retFields := []string{"id", "count", "task", "operator info"}
if e.Analyze {
retFields = append(retFields, "execution info")
retFields = append(retFields, "execution info", "memory")
}
schema := expression.NewSchema(make([]*expression.Column, 0, len(retFields))...)
for _, fieldName := range retFields {
@ -643,6 +643,13 @@ func (e *Explain) prepareOperatorInfo(p PhysicalPlan, taskType string, indent st
} else {
row = append(row, "time:0ns, loops:0, rows:0")
}
tracker := e.ctx.GetSessionVars().StmtCtx.MemTracker.SearchTracker(p.ExplainID().String())
if tracker != nil {
row = append(row, tracker.BytesToString(tracker.MaxConsumed()))
} else {
row = append(row, "N/A")
}
}
e.Rows = append(e.Rows, row)
}

View File

@ -87,11 +87,6 @@ func (t *Tracker) AttachTo(parent *Tracker) {
t.parent.Consume(t.BytesConsumed())
}
// Detach detaches this Tracker from its parent.
func (t *Tracker) Detach() {
t.parent.remove(t)
}
func (t *Tracker) remove(oldChild *Tracker) {
t.mu.Lock()
defer t.mu.Unlock()
@ -144,17 +139,13 @@ func (t *Tracker) Consume(bytes int64) {
rootExceed = tracker
}
if tracker.parent == nil {
// since we only need a total memory usage during execution,
// we only record max consumed bytes in root(statement-level) for performance.
for {
maxNow := atomic.LoadInt64(&tracker.maxConsumed)
consumed := atomic.LoadInt64(&tracker.bytesConsumed)
if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) {
continue
}
break
for {
maxNow := atomic.LoadInt64(&tracker.maxConsumed)
consumed := atomic.LoadInt64(&tracker.bytesConsumed)
if consumed > maxNow && !atomic.CompareAndSwapInt64(&tracker.maxConsumed, maxNow, consumed) {
continue
}
break
}
}
if rootExceed != nil {
@ -172,6 +163,21 @@ func (t *Tracker) MaxConsumed() int64 {
return atomic.LoadInt64(&t.maxConsumed)
}
// SearchTracker searches the specific tracker under this tracker.
func (t *Tracker) SearchTracker(label string) *Tracker {
if t.label.String() == label {
return t
}
t.mu.Lock()
defer t.mu.Unlock()
for _, child := range t.mu.children {
if result := child.SearchTracker(label); result != nil {
return result
}
}
return nil
}
// String returns the string representation of this Tracker tree.
func (t *Tracker) String() string {
buffer := bytes.NewBufferString("\n")