diff --git a/plan/analyze_test.go b/plan/analyze_test.go index 2c5f362f52..fe8ecc22da 100644 --- a/plan/analyze_test.go +++ b/plan/analyze_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/plan" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" ) @@ -32,13 +33,18 @@ type testAnalyzeSuite struct { } func (s *testAnalyzeSuite) TestAnalyze(c *C) { + plan.UseDAGPlanBuilder = true defer func() { + plan.UseDAGPlanBuilder = false testleak.AfterTest(c)() }() store, err := newStoreWithBootstrap() c.Assert(err, IsNil) - defer store.Close() testKit := testkit.NewTestKit(c, store) + defer func() { + testKit.MustExec("drop table t, t1, t2") + store.Close() + }() testKit.MustExec("use test") testKit.MustExec("create table t (a int, b int)") testKit.MustExec("create index a on t (a)") @@ -64,31 +70,36 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) { // Test analyze full table. { sql: "select * from t where t.a <= 2", - best: "Table(t)", + best: "TableReader(Table(t)->Sel([le(test.t.a, 2)]))", + }, + { + sql: "select * from t where t.a < 2", + best: "IndexLookUp(Index(t.a)[[-inf,2)], Table(t))", }, { sql: "select * from t where t.a = 1 and t.b <= 2", - best: "Index(t.b)[[-inf,2]]", + best: "IndexLookUp(Index(t.b)[[-inf,2]], Table(t)->Sel([eq(test.t.a, 1)]))", }, // Test not analyzed table. { sql: "select * from t1 where t1.a <= 2", - best: "Index(t1.a)[[-inf,2]]", + best: "IndexLookUp(Index(t1.a)[[-inf,2]], Table(t1))", }, { sql: "select * from t1 where t1.a = 1 and t1.b <= 2", - best: "Index(t1.a)[[1,1]]", + best: "IndexLookUp(Index(t1.a)[[1,1]], Table(t1)->Sel([le(test.t1.b, 2)]))", }, // Test analyze single index. { sql: "select * from t2 where t2.a <= 2", // This is not the best because the histogram for index b is pseudo, then the row count calculated for such // a small table is always tableRowCount/3, so the cost is smaller. - best: "Index(t2.b)[[,+inf]]", + // FIXME: Fix it after implementing selectivity estimation for normal column. + best: "IndexLookUp(Index(t2.b)[[,+inf]], Table(t2)->Sel([le(test.t2.a, 2)]))", }, { sql: "select * from t2 where t2.a = 1 and t2.b <= 2", - best: "Index(t2.b)[[-inf,2]]", + best: "IndexLookUp(Index(t2.b)[[-inf,2]], Table(t2)->Sel([eq(test.t2.a, 1)]))", }, } for _, tt := range tests { @@ -108,10 +119,11 @@ func (s *testAnalyzeSuite) TestAnalyze(c *C) { } func newStoreWithBootstrap() (kv.Storage, error) { - store, err := tidb.NewStore(tidb.EngineGoLevelDBMemory) + store, err := tikv.NewMockTikvStore("") if err != nil { return nil, errors.Trace(err) } + tidb.SetSchemaLease(0) _, err = tidb.BootstrapSession(store) return store, errors.Trace(err) } diff --git a/plan/new_physical_plan_builder.go b/plan/new_physical_plan_builder.go index 63ed15867b..b5afb7fdcc 100644 --- a/plan/new_physical_plan_builder.go +++ b/plan/new_physical_plan_builder.go @@ -27,10 +27,20 @@ import ( "github.com/pingcap/tidb/util/types" ) +// wholeTaskTypes records all possible kinds of task that a plan can return. For Agg, TopN and Limit, we will try to get +// these tasks one by one. +var wholeTaskTypes = [...]taskType{rootTaskType, copSingleReadTaskType, copDoubleReadTaskType} + +var invalidTask = &rootTaskProfile{cst: math.MaxFloat64} + func (p *requiredProp) enforceProperty(task taskProfile, ctx context.Context, allocator *idAllocator) taskProfile { if p.isEmpty() { return task } + // If task is invalid, keep it remained. + if task.plan() == nil { + return task + } sort := Sort{ByItems: make([]*ByItems, 0, len(p.cols))}.init(allocator, ctx) for _, col := range p.cols { sort.ByItems = append(sort.ByItems, &ByItems{col, p.desc}) @@ -43,7 +53,7 @@ func (p *requiredProp) enforceProperty(task taskProfile, ctx context.Context, al // When a sort column will be replaced by scalar function, we refuse it. // When a sort column will be replaced by a constant, we just remove it. func (p *Projection) getPushedProp(prop *requiredProp) (*requiredProp, bool) { - newProp := &requiredProp{} + newProp := &requiredProp{taskTp: rootTaskType} if prop.isEmpty() { return newProp, false } @@ -77,6 +87,10 @@ func (p *Projection) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, e if task != nil { return task, nil } + if prop.taskTp != rootTaskType { + // Projection cannot be pushed down currently, it can only return rootTask. + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } // enforceProperty task. task, err = p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{}) if err != nil { @@ -109,6 +123,10 @@ func (p *LogicalJoin) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, if task != nil { return task, nil } + if prop.taskTp != rootTaskType { + // Join cannot be pushed down currently, it can only return rootTask. + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } switch p.JoinType { case SemiJoin, LeftOuterSemiJoin: task, err = p.convert2SemiJoin(prop) @@ -144,13 +162,13 @@ func (p *LogicalJoin) convert2MergeJoin(prop *requiredProp) (taskProfile, error) }.init(p.allocator, p.ctx) mergeJoin.SetSchema(p.schema) lJoinKey := p.EqualConditions[0].GetArgs()[0].(*expression.Column) - lProp := &requiredProp{cols: []*expression.Column{lJoinKey}} + lProp := &requiredProp{cols: []*expression.Column{lJoinKey}, taskTp: rootTaskType} lTask, err := lChild.convert2NewPhysicalPlan(lProp) if err != nil { return nil, errors.Trace(err) } rJoinKey := p.EqualConditions[0].GetArgs()[1].(*expression.Column) - rProp := &requiredProp{cols: []*expression.Column{rJoinKey}} + rProp := &requiredProp{cols: []*expression.Column{rJoinKey}, taskTp: rootTaskType} rTask, err := rChild.convert2NewPhysicalPlan(rProp) if err != nil { return nil, errors.Trace(err) @@ -178,11 +196,11 @@ func (p *LogicalJoin) convert2SemiJoin(prop *requiredProp) (taskProfile, error) Anti: p.anti, }.init(p.allocator, p.ctx) semiJoin.SetSchema(p.schema) - lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{}) + lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) if err != nil { return nil, errors.Trace(err) } - rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{}) + rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) if err != nil { return nil, errors.Trace(err) } @@ -206,11 +224,11 @@ func (p *LogicalJoin) convert2HashJoin(prop *requiredProp) (taskProfile, error) DefaultValues: p.DefaultValues, }.init(p.allocator, p.ctx) hashJoin.SetSchema(p.schema) - lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{}) + lTask, err := lChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) if err != nil { return nil, errors.Trace(err) } - rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{}) + rTask, err := rChild.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) if err != nil { return nil, errors.Trace(err) } @@ -232,7 +250,7 @@ func (p *LogicalJoin) convert2HashJoin(prop *requiredProp) (taskProfile, error) // getPropByOrderByItems will check if this sort property can be pushed or not. In order to simplify the problem, we only // consider the case that all expression are columns and all of them are asc or desc. -func getPropByOrderByItems(items []*ByItems) (*requiredProp, bool) { +func getPropByOrderByItems(items []*ByItems, taskTp taskType) (*requiredProp, bool) { desc := false cols := make([]*expression.Column, 0, len(items)) for i, item := range items { @@ -246,7 +264,7 @@ func getPropByOrderByItems(items []*ByItems) (*requiredProp, bool) { return nil, false } } - return &requiredProp{cols, desc}, true + return &requiredProp{cols, desc, taskTp}, true } // convert2NewPhysicalPlan implements PhysicalPlan interface. @@ -260,21 +278,25 @@ func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) if task != nil { return task, nil } + if prop.taskTp != rootTaskType { + // TODO: This is a trick here, because an operator that can be pushed to Coprocessor can never be pushed across sort. + // e.g. If an aggregation want to be pushed, the SQL is always like select count(*) from t order by ... + // The Sort will on top of Aggregation. If the SQL is like select count(*) from (select * from s order by k). + // The Aggregation will also be blocked by projection. In the future we will break this restriction. + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } // enforce branch task, err = p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{}) if err != nil { return nil, errors.Trace(err) } task = p.attach2TaskProfile(task) - newProp, canPassProp := getPropByOrderByItems(p.ByItems) + newProp, canPassProp := getPropByOrderByItems(p.ByItems, rootTaskType) if canPassProp { orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp) if err != nil { return nil, errors.Trace(err) } - if cop, ok := orderedTask.(*copTaskProfile); ok { - orderedTask = finishCopTask(cop, p.ctx, p.allocator) - } if orderedTask.cost() < task.cost() { task = orderedTask } @@ -283,6 +305,7 @@ func (p *Sort) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) return task, p.storeTaskProfile(prop, task) } +// convert2NewPhysicalPlan implements LogicalPlan interface. func (p *TopN) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) { task, err := p.getTaskProfile(prop) if err != nil { @@ -291,26 +314,61 @@ func (p *TopN) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) if task != nil { return task, nil } - // enforce branch - task, err = p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{}) - if err != nil { - return nil, errors.Trace(err) + if prop.taskTp != rootTaskType { + // TopN can only return rootTask. + return invalidTask, p.storeTaskProfile(prop, invalidTask) } - task = p.attach2TaskProfile(task) - newProp, canPassProp := getPropByOrderByItems(p.ByItems) - if canPassProp { - orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp) + for _, taskTp := range wholeTaskTypes { + // Try to enforce topN for child. + optTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: taskTp}) if err != nil { return nil, errors.Trace(err) } - limit := Limit{Offset: p.Offset, Count: p.Count}.init(p.allocator, p.ctx) - limit.SetSchema(p.schema) - orderedTask = limit.attach2TaskProfile(orderedTask) - if orderedTask.cost() < task.cost() { - task = orderedTask + optTask = p.attach2TaskProfile(optTask) + // Try to enforce sort to child and add limit for it. + newProp, canPassProp := getPropByOrderByItems(p.ByItems, taskTp) + if canPassProp { + orderedTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(newProp) + if err != nil { + return nil, errors.Trace(err) + } + limit := Limit{Offset: p.Offset, Count: p.Count}.init(p.allocator, p.ctx) + limit.SetSchema(p.schema) + orderedTask = limit.attach2TaskProfile(orderedTask) + if orderedTask.cost() < optTask.cost() { + optTask = orderedTask + } + } + optTask = prop.enforceProperty(optTask, p.ctx, p.allocator) + if task == nil || task.cost() > optTask.cost() { + task = optTask + } + } + return task, p.storeTaskProfile(prop, task) +} + +func (p *Limit) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) { + task, err := p.getTaskProfile(prop) + if err != nil { + return nil, errors.Trace(err) + } + if task != nil { + return task, nil + } + if prop.taskTp != rootTaskType { + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } + for _, taskTp := range wholeTaskTypes { + optTask, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: taskTp}) + if err != nil { + return nil, errors.Trace(err) + } + optTask = p.attach2TaskProfile(optTask) + optTask = prop.enforceProperty(optTask, p.ctx, p.allocator) + if task == nil || task.cost() > optTask.cost() { + task = optTask } } - task = prop.enforceProperty(task, p.ctx, p.allocator) return task, p.storeTaskProfile(prop, task) } @@ -323,6 +381,9 @@ func (p *baseLogicalPlan) convert2NewPhysicalPlan(prop *requiredProp) (taskProfi if task != nil { return task, nil } + if prop.taskTp != rootTaskType { + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } if len(p.basePlan.children) == 0 { task = &rootTaskProfile{p: p.basePlan.self.(PhysicalPlan)} } else { @@ -485,6 +546,13 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo // On this way, it's double read case. copTask.tablePlan = PhysicalTableScan{Columns: p.Columns, Table: is.Table}.init(p.allocator, p.ctx) copTask.tablePlan.SetSchema(p.schema) + // If it's parent requires single read task, return max cost. + if prop.taskTp == copSingleReadTaskType { + return &copTaskProfile{cst: math.MaxFloat64}, nil + } + } else if prop.taskTp == copDoubleReadTaskType { + // If it's parent requires double read task, return max cost. + return &copTaskProfile{cst: math.MaxFloat64}, nil } var indexCols []*expression.Column for _, col := range idx.Columns { @@ -526,6 +594,9 @@ func (p *DataSource) convertToIndexScan(prop *requiredProp, idx *model.IndexInfo task = tryToAddUnionScan(copTask, p.pushedDownConds, p.ctx, p.allocator) task = prop.enforceProperty(task, p.ctx, p.allocator) } + if prop.taskTp == rootTaskType { + task = finishCopTask(task, p.ctx, p.allocator) + } return task, nil } @@ -572,6 +643,9 @@ func matchIndicesProp(idxCols []*model.IndexColumn, propCols []*expression.Colum // convertToTableScan converts the DataSource to table scan. func (p *DataSource) convertToTableScan(prop *requiredProp) (task taskProfile, err error) { + if prop.taskTp == copDoubleReadTaskType { + return &copTaskProfile{cst: math.MaxFloat64}, nil + } ts := PhysicalTableScan{ Table: p.tableInfo, Columns: p.Columns, @@ -631,6 +705,9 @@ func (p *DataSource) convertToTableScan(prop *requiredProp) (task taskProfile, e task = tryToAddUnionScan(copTask, p.pushedDownConds, p.ctx, p.allocator) task = prop.enforceProperty(task, p.ctx, p.allocator) } + if prop.taskTp == rootTaskType { + task = finishCopTask(task, p.ctx, p.allocator) + } return task, nil } @@ -642,6 +719,10 @@ func (p *Union) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) if task != nil { return task, nil } + if prop.taskTp != rootTaskType { + // Union can only return rootTask. + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } // Union is a sort blocker. We can only enforce it. tasks := make([]taskProfile, 0, len(p.children)) for _, child := range p.children { @@ -692,6 +773,10 @@ func (p *LogicalAggregation) convert2NewPhysicalPlan(prop *requiredProp) (taskPr if task != nil { return task, nil } + if prop.taskTp != rootTaskType { + // Aggregation can only return rootTask. + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } task, err = p.convert2HashAggregation(prop) if err != nil { return nil, errors.Trace(err) @@ -699,21 +784,26 @@ func (p *LogicalAggregation) convert2NewPhysicalPlan(prop *requiredProp) (taskPr return task, p.storeTaskProfile(prop, task) } -func (p *LogicalAggregation) convert2HashAggregation(prop *requiredProp) (taskProfile, error) { - task, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{}) - if err != nil { - return nil, errors.Trace(err) +func (p *LogicalAggregation) convert2HashAggregation(prop *requiredProp) (bestTask taskProfile, _ error) { + for _, taskTp := range wholeTaskTypes { + task, err := p.children[0].(LogicalPlan).convert2NewPhysicalPlan(&requiredProp{taskTp: taskTp}) + if err != nil { + return nil, errors.Trace(err) + } + ha := PhysicalAggregation{ + GroupByItems: p.GroupByItems, + AggFuncs: p.AggFuncs, + HasGby: len(p.GroupByItems) > 0, + AggType: CompleteAgg, + }.init(p.allocator, p.ctx) + ha.SetSchema(p.schema) + task = ha.attach2TaskProfile(task) + task = prop.enforceProperty(task, p.ctx, p.allocator) + if bestTask == nil || task.cost() < bestTask.cost() { + bestTask = task + } } - ha := PhysicalAggregation{ - GroupByItems: p.GroupByItems, - AggFuncs: p.AggFuncs, - HasGby: len(p.GroupByItems) > 0, - AggType: CompleteAgg, - }.init(p.allocator, p.ctx) - ha.SetSchema(p.schema) - task = ha.attach2TaskProfile(task) - task = prop.enforceProperty(task, p.ctx, p.allocator) - return task, nil + return } func (p *LogicalApply) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, error) { @@ -724,11 +814,15 @@ func (p *LogicalApply) convert2NewPhysicalPlan(prop *requiredProp) (taskProfile, if task != nil { return task, nil } - // TODO: refine this code. + if prop.taskTp != rootTaskType { + // Apply can only return rootTask. + return invalidTask, p.storeTaskProfile(prop, invalidTask) + } + // TODO: Refine this code. if p.JoinType == SemiJoin || p.JoinType == LeftOuterSemiJoin { - task, err = p.convert2SemiJoin(&requiredProp{}) + task, err = p.convert2SemiJoin(&requiredProp{taskTp: rootTaskType}) } else { - task, err = p.convert2HashJoin(&requiredProp{}) + task, err = p.convert2HashJoin(&requiredProp{taskTp: rootTaskType}) } if err != nil { return nil, errors.Trace(err) diff --git a/plan/optimizer.go b/plan/optimizer.go index af9ee144ce..54f061b58c 100644 --- a/plan/optimizer.go +++ b/plan/optimizer.go @@ -144,7 +144,7 @@ func logicalOptimize(flag uint64, logic LogicalPlan, ctx context.Context, alloc } func dagPhysicalOptimize(logic LogicalPlan) (PhysicalPlan, error) { - task, err := logic.convert2NewPhysicalPlan(&requiredProp{}) + task, err := logic.convert2NewPhysicalPlan(&requiredProp{taskTp: rootTaskType}) if err != nil { return nil, errors.Trace(err) } diff --git a/plan/plan.go b/plan/plan.go index ce3dedc1b8..bd1782533f 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -75,16 +75,48 @@ func (c *columnProp) equal(nc *columnProp, ctx context.Context) bool { return c.col.Equal(nc.col, ctx) && c.desc == nc.desc } -// requriedProp stands for the required order property by parents. It will be all asc or desc. +// taskType is the type of execution task. +type taskType int + +const ( + rootTaskType taskType = iota + copSingleReadTaskType // TableScan and IndexScan + copDoubleReadTaskType // IndexLookUp +) + +// String implements fmt.Stringer interface. +func (t taskType) String() string { + switch t { + case rootTaskType: + return "rootTask" + case copSingleReadTaskType: + return "copSingleReadTask" + case copDoubleReadTaskType: + return "copDoubleReadTask" + } + return "UnknownTaskType" +} + +// requriedProp stands for the required physical property by parents. +// It contains the orders, if the order is desc and the task types. type requiredProp struct { cols []*expression.Column desc bool + // taskTp means the type of task that an operator requires. + // It needs to be specified because two different tasks can't be compared with cost directly. + // e.g. If a copTask takes less cost than a rootTask, we can't sure that we must choose the former one. Because the copTask + // must be finished and increase its cost in sometime, but we can't make sure the finishing time. So the best way + // to let the comparision fair is to add taskType to required property. + taskTp taskType } func (p *requiredProp) equal(prop *requiredProp) bool { if len(p.cols) != len(prop.cols) || p.desc != prop.desc { return false } + if p.taskTp != prop.taskTp { + return false + } for i := range p.cols { if !p.cols[i].Equal(prop.cols[i], nil) { return false @@ -99,18 +131,19 @@ func (p *requiredProp) isEmpty() bool { // getHashKey encodes prop to a unique key. The key will be stored in the memory table. func (p *requiredProp) getHashKey() ([]byte, error) { - datums := make([]types.Datum, 0, len(p.cols)*2+1) + datums := make([]types.Datum, 0, len(p.cols)*2+2) datums = append(datums, types.NewDatum(p.desc)) for _, c := range p.cols { datums = append(datums, types.NewDatum(c.FromID), types.NewDatum(c.Position)) } + datums = append(datums, types.NewDatum(int(p.taskTp))) bytes, err := codec.EncodeValue(nil, datums...) return bytes, errors.Trace(err) } // String implements fmt.Stringer interface. Just for test. func (p *requiredProp) String() string { - return fmt.Sprintf("Prop{cols: %s, desc: %v}", p.cols, p.desc) + return fmt.Sprintf("Prop{cols: %s, desc: %v, taskTp: %s}", p.cols, p.desc, p.taskTp) } type requiredProperty struct { diff --git a/plan/task_profile.go b/plan/task_profile.go index 2eb8f201b5..859d703810 100644 --- a/plan/task_profile.go +++ b/plan/task_profile.go @@ -210,6 +210,10 @@ func (t *rootTaskProfile) plan() PhysicalPlan { } func (p *Limit) attach2TaskProfile(profiles ...taskProfile) taskProfile { + // If task is invalid, keep it remained. + if profiles[0].plan() == nil { + return profiles[0] + } profile := profiles[0].copy() if cop, ok := profile.(*copTaskProfile); ok { // If the task is copTask, the Limit can always be pushed down. @@ -264,6 +268,10 @@ func (p *Sort) attach2TaskProfile(profiles ...taskProfile) taskProfile { } func (p *TopN) attach2TaskProfile(profiles ...taskProfile) taskProfile { + // If task is invalid, keep it remained. + if profiles[0].plan() == nil { + return profiles[0] + } profile := profiles[0].copy() // This is a topN plan. if copTask, ok := profile.(*copTaskProfile); ok && p.canPushDown() { @@ -389,6 +397,10 @@ func (p *PhysicalAggregation) newPartialAggregate() (partialAgg, finalAgg *Physi } func (p *PhysicalAggregation) attach2TaskProfile(profiles ...taskProfile) taskProfile { + // If task is invalid, keep it remained. + if profiles[0].plan() == nil { + return profiles[0] + } // TODO: We only consider hash aggregation here. profile := profiles[0].copy() if cop, ok := profile.(*copTaskProfile); ok {