From 3625c1dd4ee8ad29a2e1a5c19e948ec685adaa8e Mon Sep 17 00:00:00 2001 From: Han Fei Date: Sat, 1 Apr 2017 13:53:31 +0800 Subject: [PATCH] plan: add base phyiscal plan. (#2975) add basePhysicalPlan to implement some common physical interface. --- plan/aggregation_push_down.go | 29 ++--- plan/expression_rewriter.go | 36 ++---- plan/initialize.go | 212 ++++++++++++++++++++++++++++++++++ plan/join_reorder.go | 13 +-- plan/logical_plan_builder.go | 127 ++++++-------------- plan/logical_plan_test.go | 6 +- plan/logical_plans.go | 43 +++++++ plan/match_property.go | 126 +++++--------------- plan/physical_plan_builder.go | 148 ++++++++---------------- plan/physical_plans.go | 128 +++++++++++++++++--- plan/plan.go | 91 +++++++++------ plan/planbuilder.go | 92 +++++---------- plan/plans.go | 10 ++ plan/predicate_push_down.go | 8 +- plan/task_profile.go | 7 +- 15 files changed, 607 insertions(+), 469 deletions(-) create mode 100644 plan/initialize.go diff --git a/plan/aggregation_push_down.go b/plan/aggregation_push_down.go index 6479891cc5..fd1a6fb999 100644 --- a/plan/aggregation_push_down.go +++ b/plan/aggregation_push_down.go @@ -252,12 +252,10 @@ func (a *aggregationOptimizer) checkAnyCountAndSum(aggFuncs []expression.Aggrega } func (a *aggregationOptimizer) makeNewAgg(aggFuncs []expression.AggregationFunction, gbyCols []*expression.Column) *Aggregation { - agg := &Aggregation{ - GroupByItems: expression.Column2Exprs(gbyCols), - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, a.allocator), - groupByCols: gbyCols, - } - agg.initIDAndContext(a.ctx) + agg := Aggregation{ + GroupByItems: expression.Column2Exprs(gbyCols), + groupByCols: gbyCols, + }.init(a.allocator, a.ctx) var newAggFuncs []expression.AggregationFunction schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncs)+len(gbyCols))...) for _, aggFunc := range aggFuncs { @@ -278,13 +276,11 @@ func (a *aggregationOptimizer) makeNewAgg(aggFuncs []expression.AggregationFunct // pushAggCrossUnion will try to push the agg down to the union. If the new aggregation's group-by columns doesn't contain unique key. // We will return the new aggregation. Otherwise we will transform the aggregation to projection. func (a *aggregationOptimizer) pushAggCrossUnion(agg *Aggregation, unionSchema *expression.Schema, unionChild LogicalPlan) LogicalPlan { - newAgg := &Aggregation{ - AggFuncs: make([]expression.AggregationFunction, 0, len(agg.AggFuncs)), - GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)), - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, a.allocator), - } + newAgg := Aggregation{ + AggFuncs: make([]expression.AggregationFunction, 0, len(agg.AggFuncs)), + GroupByItems: make([]expression.Expression, 0, len(agg.GroupByItems)), + }.init(a.allocator, a.ctx) newAgg.SetSchema(agg.schema.Clone()) - newAgg.initIDAndContext(a.ctx) for _, aggFunc := range agg.AggFuncs { newAggFunc := aggFunc.Clone() newArgs := make([]expression.Expression, 0, len(newAggFunc.GetArgs())) @@ -428,12 +424,9 @@ func (a *aggregationOptimizer) tryToEliminateAggregation(agg *Aggregation) *Proj } func (a *aggregationOptimizer) convertAggToProj(agg *Aggregation, ctx context.Context, allocator *idAllocator) *Projection { - proj := &Projection{ - Exprs: make([]expression.Expression, 0, len(agg.AggFuncs)), - baseLogicalPlan: newBaseLogicalPlan(TypeProj, allocator), - } - proj.self = proj - proj.initIDAndContext(ctx) + proj := Projection{ + Exprs: make([]expression.Expression, 0, len(agg.AggFuncs)), + }.init(a.allocator, a.ctx) for _, fun := range agg.AggFuncs { expr := a.rewriteExpr(fun) proj.Exprs = append(proj.Exprs, expr) diff --git a/plan/expression_rewriter.go b/plan/expression_rewriter.go index 842a76456d..860afcdcd7 100644 --- a/plan/expression_rewriter.go +++ b/plan/expression_rewriter.go @@ -288,12 +288,9 @@ func (er *expressionRewriter) handleOtherComparableSubq(lexpr, rexpr expression. funcName = ast.AggFuncMin } aggFunc := expression.NewAggFunction(funcName, []expression.Expression{rexpr}, false) - agg := &Aggregation{ - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, er.b.allocator), - AggFuncs: []expression.AggregationFunction{aggFunc}, - } - agg.initIDAndContext(er.b.ctx) - agg.self = agg + agg := Aggregation{ + AggFuncs: []expression.AggregationFunction{aggFunc}, + }.init(er.b.allocator, er.ctx) addChild(agg, np) aggCol0 := &expression.Column{ ColName: model.NewCIStr("agg_Col_0"), @@ -352,12 +349,9 @@ func (er *expressionRewriter) buildQuantifierPlan(agg *Aggregation, cond, rexpr outerSchemaLen := er.p.Schema().Len() er.p = er.b.buildApplyWithJoinType(er.p, agg, InnerJoin) joinSchema := er.p.Schema() - proj := &Projection{ - baseLogicalPlan: newBaseLogicalPlan(TypeProj, er.b.allocator), - Exprs: expression.Column2Exprs(joinSchema.Clone().Columns[:outerSchemaLen]), - } - proj.self = proj - proj.initIDAndContext(er.ctx) + proj := Projection{ + Exprs: expression.Column2Exprs(joinSchema.Clone().Columns[:outerSchemaLen]), + }.init(er.b.allocator, er.ctx) proj.SetSchema(expression.NewSchema(joinSchema.Clone().Columns[:outerSchemaLen]...)) proj.Exprs = append(proj.Exprs, cond) proj.schema.Append(&expression.Column{ @@ -377,12 +371,9 @@ func (er *expressionRewriter) buildQuantifierPlan(agg *Aggregation, cond, rexpr func (er *expressionRewriter) handleNEAny(lexpr, rexpr expression.Expression, np LogicalPlan) { firstRowFunc := expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{rexpr}, false) countFunc := expression.NewAggFunction(ast.AggFuncCount, []expression.Expression{rexpr.Clone()}, true) - agg := &Aggregation{ - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, er.b.allocator), - AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc}, - } - agg.initIDAndContext(er.b.ctx) - agg.self = agg + agg := Aggregation{ + AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc}, + }.init(er.b.allocator, er.ctx) addChild(agg, np) firstRowResultCol := &expression.Column{ ColName: model.NewCIStr("col_firstRow"), @@ -408,12 +399,9 @@ func (er *expressionRewriter) handleNEAny(lexpr, rexpr expression.Expression, np func (er *expressionRewriter) handleEQAll(lexpr, rexpr expression.Expression, np LogicalPlan) { firstRowFunc := expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{rexpr}, false) countFunc := expression.NewAggFunction(ast.AggFuncCount, []expression.Expression{rexpr.Clone()}, true) - agg := &Aggregation{ - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, er.b.allocator), - AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc}, - } - agg.initIDAndContext(er.b.ctx) - agg.self = agg + agg := Aggregation{ + AggFuncs: []expression.AggregationFunction{firstRowFunc, countFunc}, + }.init(er.b.allocator, er.ctx) addChild(agg, np) firstRowResultCol := &expression.Column{ ColName: model.NewCIStr("col_firstRow"), diff --git a/plan/initialize.go b/plan/initialize.go new file mode 100644 index 0000000000..7475cf6a50 --- /dev/null +++ b/plan/initialize.go @@ -0,0 +1,212 @@ +// Copyright 2017 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package plan + +import "github.com/pingcap/tidb/context" + +func (p Aggregation) init(allocator *idAllocator, ctx context.Context) *Aggregation { + p.basePlan = newBasePlan(TypeAgg, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + return &p +} + +func (p Join) init(allocator *idAllocator, ctx context.Context) *Join { + p.basePlan = newBasePlan(TypeJoin, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + return &p +} + +func (p DataSource) init(allocator *idAllocator, ctx context.Context) *DataSource { + p.basePlan = newBasePlan(TypeTableScan, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + return &p +} + +func (p Apply) init(allocator *idAllocator, ctx context.Context) *Apply { + p.basePlan = newBasePlan(TypeApply, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + return &p +} + +func (p Selection) init(allocator *idAllocator, ctx context.Context) *Selection { + p.basePlan = newBasePlan(TypeSel, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Projection) init(allocator *idAllocator, ctx context.Context) *Projection { + p.basePlan = newBasePlan(TypeProj, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Union) init(allocator *idAllocator, ctx context.Context) *Union { + p.basePlan = newBasePlan(TypeUnion, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Sort) init(allocator *idAllocator, ctx context.Context) *Sort { + p.basePlan = newBasePlan(TypeSort, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Limit) init(allocator *idAllocator, ctx context.Context) *Limit { + p.basePlan = newBasePlan(TypeLimit, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p TableDual) init(allocator *idAllocator, ctx context.Context) *TableDual { + p.basePlan = newBasePlan(TypeDual, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Exists) init(allocator *idAllocator, ctx context.Context) *Exists { + p.basePlan = newBasePlan(TypeExists, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p MaxOneRow) init(allocator *idAllocator, ctx context.Context) *MaxOneRow { + p.basePlan = newBasePlan(TypeMaxOneRow, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Update) init(allocator *idAllocator, ctx context.Context) *Update { + p.basePlan = newBasePlan(TypeUpate, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Delete) init(allocator *idAllocator, ctx context.Context) *Delete { + p.basePlan = newBasePlan(TypeDelete, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Insert) init(allocator *idAllocator, ctx context.Context) *Insert { + p.basePlan = newBasePlan(TypeInsert, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Show) init(allocator *idAllocator, ctx context.Context) *Show { + p.basePlan = newBasePlan(TypeShow, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Analyze) init(allocator *idAllocator, ctx context.Context) *Analyze { + p.basePlan = newBasePlan(TypeAnalyze, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p SelectLock) init(allocator *idAllocator, ctx context.Context) *SelectLock { + p.basePlan = newBasePlan(TypeLock, allocator, ctx, &p) + p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalTableScan) init(allocator *idAllocator, ctx context.Context) *PhysicalTableScan { + p.basePlan = newBasePlan(TypeTableScan, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalIndexScan) init(allocator *idAllocator, ctx context.Context) *PhysicalIndexScan { + p.basePlan = newBasePlan(TypeIdxScan, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalMemTable) init(allocator *idAllocator, ctx context.Context) *PhysicalMemTable { + p.basePlan = newBasePlan(TypeMemTableScan, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalDummyScan) init(allocator *idAllocator, ctx context.Context) *PhysicalDummyScan { + p.basePlan = newBasePlan(TypeDummy, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalHashJoin) init(allocator *idAllocator, ctx context.Context) *PhysicalHashJoin { + tp := TypeHashRightJoin + if p.SmallTable == 1 { + tp = TypeHashLeftJoin + } + p.basePlan = newBasePlan(tp, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalHashSemiJoin) init(allocator *idAllocator, ctx context.Context) *PhysicalHashSemiJoin { + p.basePlan = newBasePlan(TypeHashSemiJoin, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalMergeJoin) init(allocator *idAllocator, ctx context.Context) *PhysicalMergeJoin { + p.basePlan = newBasePlan(TypeMergeJoin, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalAggregation) init(allocator *idAllocator, ctx context.Context) *PhysicalAggregation { + tp := TypeHashAgg + if p.AggType == StreamedAgg { + tp = TypeStreamAgg + } + p.basePlan = newBasePlan(tp, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalApply) init(allocator *idAllocator, ctx context.Context) *PhysicalApply { + p.basePlan = newBasePlan(TypeApply, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p Cache) init(allocator *idAllocator, ctx context.Context) *Cache { + p.basePlan = newBasePlan(TypeCache, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} + +func (p PhysicalUnionScan) init(allocator *idAllocator, ctx context.Context) *PhysicalUnionScan { + p.basePlan = newBasePlan(TypeCache, allocator, ctx, &p) + p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan) + return &p +} diff --git a/plan/join_reorder.go b/plan/join_reorder.go index 21e95afd89..3d51dd6a10 100644 --- a/plan/join_reorder.go +++ b/plan/join_reorder.go @@ -18,6 +18,7 @@ import ( "github.com/ngaut/log" "github.com/pingcap/tidb/ast" + "github.com/pingcap/tidb/context" "github.com/pingcap/tidb/expression" ) @@ -56,6 +57,7 @@ type joinReOrderSolver struct { resultJoin LogicalPlan groupRank []*rankInfo allocator *idAllocator + ctx context.Context } type edgeList []*rankInfo @@ -183,13 +185,10 @@ func (e *joinReOrderSolver) makeBushyJoin(cartesianJoinGroup []LogicalPlan) { } func (e *joinReOrderSolver) newJoin(lChild, rChild LogicalPlan) *Join { - join := &Join{ - JoinType: InnerJoin, - reordered: true, - baseLogicalPlan: newBaseLogicalPlan(TypeJoin, e.allocator), - } - join.self = join - join.initIDAndContext(lChild.context()) + join := Join{ + JoinType: InnerJoin, + reordered: true, + }.init(e.allocator, e.ctx) join.SetChildren(lChild, rChild) join.SetSchema(expression.MergeSchema(lChild.Schema(), rChild.Schema())) lChild.SetParents(join) diff --git a/plan/logical_plan_builder.go b/plan/logical_plan_builder.go index 6c4ed2239c..3b239950f5 100644 --- a/plan/logical_plan_builder.go +++ b/plan/logical_plan_builder.go @@ -52,11 +52,8 @@ func (p *Aggregation) collectGroupByColumns() { func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.AggregateFuncExpr, gbyItems []expression.Expression) (LogicalPlan, map[int]int) { b.optFlag = b.optFlag | flagBuildKeyInfo b.optFlag = b.optFlag | flagAggregationOptimize - agg := &Aggregation{ - AggFuncs: make([]expression.AggregationFunction, 0, len(aggFuncList)), - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, b.allocator)} - agg.self = agg - agg.initIDAndContext(b.ctx) + + agg := Aggregation{AggFuncs: make([]expression.AggregationFunction, 0, len(aggFuncList))}.init(b.allocator, b.ctx) schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncList)+p.Schema().Len())...) // aggIdxMap maps the old index to new index after applying common aggregation functions elimination. aggIndexMap := make(map[int]int) @@ -222,11 +219,9 @@ func (b *planBuilder) buildJoin(join *ast.Join) LogicalPlan { rightAlias := extractTableAlias(rightPlan) newSchema := expression.MergeSchema(leftPlan.Schema(), rightPlan.Schema()) - joinPlan := &Join{baseLogicalPlan: newBaseLogicalPlan(TypeJoin, b.allocator)} + joinPlan := Join{}.init(b.allocator, b.ctx) addChild(joinPlan, leftPlan) addChild(joinPlan, rightPlan) - joinPlan.self = joinPlan - joinPlan.initIDAndContext(b.ctx) joinPlan.SetSchema(newSchema) if b.TableHints() != nil { @@ -263,9 +258,7 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe b.optFlag = b.optFlag | flagPredicatePushDown conditions := splitWhere(where) expressions := make([]expression.Expression, 0, len(conditions)) - selection := &Selection{baseLogicalPlan: newBaseLogicalPlan(TypeSel, b.allocator)} - selection.self = selection - selection.initIDAndContext(b.ctx) + selection := Selection{}.init(b.allocator, b.ctx) for _, cond := range conditions { expr, np, err := b.rewrite(cond, p, AggMapper, false) if err != nil { @@ -289,12 +282,7 @@ func (b *planBuilder) buildSelection(p LogicalPlan, where ast.ExprNode, AggMappe // buildProjection returns a Projection plan and non-aux columns length. func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField, mapper map[*ast.AggregateFuncExpr]int) (LogicalPlan, int) { - proj := &Projection{ - Exprs: make([]expression.Expression, 0, len(fields)), - baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator), - } - proj.self = proj - proj.initIDAndContext(b.ctx) + proj := Projection{Exprs: make([]expression.Expression, 0, len(fields))}.init(b.allocator, b.ctx) schema := expression.NewSchema(make([]*expression.Column, 0, len(fields))...) oldLen := 0 for _, field := range fields { @@ -351,25 +339,21 @@ func (b *planBuilder) buildProjection(p LogicalPlan, fields []*ast.SelectField, func (b *planBuilder) buildDistinct(child LogicalPlan, length int) LogicalPlan { b.optFlag = b.optFlag | flagBuildKeyInfo b.optFlag = b.optFlag | flagAggregationOptimize - agg := &Aggregation{ - baseLogicalPlan: newBaseLogicalPlan(TypeAgg, b.allocator), - AggFuncs: make([]expression.AggregationFunction, 0, child.Schema().Len()), - GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length])} + agg := Aggregation{ + AggFuncs: make([]expression.AggregationFunction, 0, child.Schema().Len()), + GroupByItems: expression.Column2Exprs(child.Schema().Clone().Columns[:length]), + }.init(b.allocator, b.ctx) agg.collectGroupByColumns() for _, col := range child.Schema().Columns { agg.AggFuncs = append(agg.AggFuncs, expression.NewAggFunction(ast.AggFuncFirstRow, []expression.Expression{col}, false)) } - agg.self = agg - agg.initIDAndContext(b.ctx) addChild(agg, child) agg.SetSchema(child.Schema().Clone()) return agg } func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan { - u := &Union{baseLogicalPlan: newBaseLogicalPlan(TypeUnion, b.allocator)} - u.self = u - u.initIDAndContext(b.ctx) + u := Union{}.init(b.allocator, b.ctx) u.children = make([]Plan, len(union.SelectList.Selects)) for i, sel := range union.SelectList.Selects { u.children[i] = b.buildSelect(sel) @@ -381,12 +365,7 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) LogicalPlan { return nil } if _, ok := sel.(*Projection); !ok { - proj := &Projection{ - baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator), - Exprs: expression.Column2Exprs(sel.Schema().Columns), - } - proj.initIDAndContext(b.ctx) - proj.self = proj + proj := Projection{Exprs: expression.Column2Exprs(sel.Schema().Columns)}.init(b.allocator, b.ctx) proj.SetSchema(sel.Schema().Clone()) sel.SetParents(proj) proj.SetChildren(sel) @@ -447,10 +426,8 @@ func (by *ByItems) String() string { } func (b *planBuilder) buildSort(p LogicalPlan, byItems []*ast.ByItem, aggMapper map[*ast.AggregateFuncExpr]int) LogicalPlan { - var exprs []*ByItems - sort := &Sort{baseLogicalPlan: newBaseLogicalPlan(TypeSort, b.allocator)} - sort.self = sort - sort.initIDAndContext(b.ctx) + sort := Sort{}.init(b.allocator, b.ctx) + exprs := make([]*ByItems, 0, len(byItems)) for _, item := range byItems { it, np, err := b.rewrite(item.Expr, p, aggMapper, true) if err != nil { @@ -504,13 +481,11 @@ func (b *planBuilder) buildLimit(src LogicalPlan, limit *ast.Limit) LogicalPlan return nil } } - li := &Limit{ - Offset: offset, - Count: count, - baseLogicalPlan: newBaseLogicalPlan(TypeLimit, b.allocator), - } - li.self = li - li.initIDAndContext(b.ctx) + + li := Limit{ + Offset: offset, + Count: count, + }.init(b.allocator, b.ctx) addChild(li, src) li.SetSchema(src.Schema().Clone()) return li @@ -967,12 +942,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan { } sel.Fields.Fields = originalFields if oldLen != p.Schema().Len() { - proj := &Projection{ - baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator), - Exprs: expression.Column2Exprs(p.Schema().Columns[:oldLen]), - } - proj.initIDAndContext(b.ctx) - proj.self = proj + proj := Projection{Exprs: expression.Column2Exprs(p.Schema().Columns[:oldLen])}.init(b.allocator, b.ctx) addChild(proj, p) proj.SetSchema(expression.NewSchema(p.Schema().Columns[:oldLen]...)) return proj @@ -982,9 +952,7 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan { } func (b *planBuilder) buildTableDual() LogicalPlan { - dual := &TableDual{baseLogicalPlan: newBaseLogicalPlan(TypeDual, b.allocator)} - dual.self = dual - dual.initIDAndContext(b.ctx) + dual := TableDual{}.init(b.allocator, b.ctx) dual.SetSchema(expression.NewSchema()) return dual } @@ -1005,15 +973,12 @@ func (b *planBuilder) buildDataSource(tn *ast.TableName) LogicalPlan { } tableInfo := tbl.Meta() - p := &DataSource{ - indexHints: tn.IndexHints, - tableInfo: tableInfo, - baseLogicalPlan: newBaseLogicalPlan(TypeTableScan, b.allocator), - statisticTable: statisticTable, - DBName: schemaName, - } - p.self = p - p.initIDAndContext(b.ctx) + p := DataSource{ + indexHints: tn.IndexHints, + tableInfo: tableInfo, + statisticTable: statisticTable, + DBName: schemaName, + }.init(b.allocator, b.ctx) b.visitInfo = appendVisitInfo(b.visitInfo, mysql.SelectPriv, schemaName.L, tableInfo.Name.L, "") @@ -1055,13 +1020,8 @@ func (b *planBuilder) buildApplyWithJoinType(outerPlan, innerPlan LogicalPlan, t b.optFlag = b.optFlag | flagPredicatePushDown b.optFlag = b.optFlag | flagBuildKeyInfo b.optFlag = b.optFlag | flagDecorrelate - join := &Join{ - JoinType: tp, - baseLogicalPlan: newBaseLogicalPlan(TypeJoin, b.allocator), - } - ap := &Apply{Join: *join} - ap.initIDAndContext(b.ctx) - ap.self = ap + ap := Apply{Join: Join{JoinType: tp}}.init(b.allocator, b.ctx) + addChild(ap, outerPlan) addChild(ap, innerPlan) ap.SetSchema(expression.MergeSchema(outerPlan.Schema(), innerPlan.Schema())) @@ -1078,7 +1038,8 @@ func (b *planBuilder) buildSemiApply(outerPlan, innerPlan LogicalPlan, condition b.optFlag = b.optFlag | flagDecorrelate join := b.buildSemiJoin(outerPlan, innerPlan, condition, asScalar, not) ap := &Apply{Join: *join} - ap.initIDAndContext(b.ctx) + ap.tp = TypeApply + ap.id = ap.tp + ap.allocator.allocID() ap.self = ap ap.children[0].SetParents(ap) ap.children[1].SetParents(ap) @@ -1105,9 +1066,7 @@ out: break out } } - exists := &Exists{baseLogicalPlan: newBaseLogicalPlan(TypeExists, b.allocator)} - exists.self = exists - exists.initIDAndContext(b.ctx) + exists := Exists{}.init(b.allocator, b.ctx) addChild(exists, p) newCol := &expression.Column{ FromID: exists.id, @@ -1118,18 +1077,14 @@ out: } func (b *planBuilder) buildMaxOneRow(p LogicalPlan) LogicalPlan { - maxOneRow := &MaxOneRow{baseLogicalPlan: newBaseLogicalPlan(TypeMaxOneRow, b.allocator)} - maxOneRow.self = maxOneRow - maxOneRow.initIDAndContext(b.ctx) + maxOneRow := MaxOneRow{}.init(b.allocator, b.ctx) addChild(maxOneRow, p) maxOneRow.SetSchema(p.Schema().Clone()) return maxOneRow } func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onCondition []expression.Expression, asScalar bool, not bool) *Join { - joinPlan := &Join{baseLogicalPlan: newBaseLogicalPlan(TypeJoin, b.allocator)} - joinPlan.self = joinPlan - joinPlan.initIDAndContext(b.ctx) + joinPlan := Join{}.init(b.allocator, b.ctx) for i, expr := range onCondition { onCondition[i] = expr.Decorrelate(outerPlan.Schema()) } @@ -1197,10 +1152,7 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) LogicalPlan { return nil } p = np - updt := &Update{OrderedList: orderedList, baseLogicalPlan: newBaseLogicalPlan(TypeUpate, b.allocator)} - updt.ctx = b.ctx - updt.self = updt - updt.initIDAndContext(b.ctx) + updt := Update{OrderedList: orderedList}.init(b.allocator, b.ctx) addChild(updt, p) updt.SetSchema(p.Schema()) return updt @@ -1267,13 +1219,10 @@ func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) LogicalPlan { tables = delete.Tables.Tables } - del := &Delete{ - Tables: tables, - IsMultiTable: delete.IsMultiTable, - baseLogicalPlan: newBaseLogicalPlan(TypeDelete, b.allocator), - } - del.self = del - del.initIDAndContext(b.ctx) + del := Delete{ + Tables: tables, + IsMultiTable: delete.IsMultiTable, + }.init(b.allocator, b.ctx) addChild(del, p) del.SetSchema(expression.NewSchema()) diff --git a/plan/logical_plan_test.go b/plan/logical_plan_test.go index a53cdaee67..09ad889e8a 100644 --- a/plan/logical_plan_test.go +++ b/plan/logical_plan_test.go @@ -1137,11 +1137,9 @@ func (s *testPlanSuite) TestColumnPruning(c *C) { } func (s *testPlanSuite) TestAllocID(c *C) { - pA := &DataSource{baseLogicalPlan: newBaseLogicalPlan(TypeTableScan, new(idAllocator))} - pB := &DataSource{baseLogicalPlan: newBaseLogicalPlan(TypeTableScan, new(idAllocator))} ctx := mockContext() - pA.initIDAndContext(ctx) - pB.initIDAndContext(ctx) + pA := DataSource{}.init(new(idAllocator), ctx) + pB := DataSource{}.init(new(idAllocator), ctx) c.Assert(pA.id, Equals, pB.id) } diff --git a/plan/logical_plans.go b/plan/logical_plans.go index f2055b29d2..5f003744cf 100644 --- a/plan/logical_plans.go +++ b/plan/logical_plans.go @@ -22,6 +22,27 @@ import ( "github.com/pingcap/tidb/util/types" ) +var ( + _ LogicalPlan = &Join{} + _ LogicalPlan = &Aggregation{} + _ LogicalPlan = &Projection{} + _ LogicalPlan = &Selection{} + _ LogicalPlan = &Apply{} + _ LogicalPlan = &Exists{} + _ LogicalPlan = &MaxOneRow{} + _ LogicalPlan = &TableDual{} + _ LogicalPlan = &DataSource{} + _ LogicalPlan = &Union{} + _ LogicalPlan = &Sort{} + _ LogicalPlan = &Update{} + _ LogicalPlan = &Delete{} + _ LogicalPlan = &SelectLock{} + _ LogicalPlan = &Limit{} + _ LogicalPlan = &Show{} + _ LogicalPlan = &Insert{} + _ LogicalPlan = &Analyze{} +) + // JoinType contains CrossJoin, InnerJoin, LeftOuterJoin, RightOuterJoin, FullOuterJoin, SemiJoin. type JoinType int @@ -40,6 +61,7 @@ const ( // Join is the logical join plan. type Join struct { + *basePlan baseLogicalPlan JoinType JoinType @@ -100,7 +122,10 @@ func (p *Join) extractCorrelatedCols() []*expression.CorrelatedColumn { // Projection represents a select fields plan. type Projection struct { + *basePlan baseLogicalPlan + basePhysicalPlan + Exprs []expression.Expression } @@ -114,6 +139,7 @@ func (p *Projection) extractCorrelatedCols() []*expression.CorrelatedColumn { // Aggregation represents an aggregate plan. type Aggregation struct { + *basePlan baseLogicalPlan AggFuncs []expression.AggregationFunction @@ -138,7 +164,9 @@ func (p *Aggregation) extractCorrelatedCols() []*expression.CorrelatedColumn { // Selection means a filter. type Selection struct { + *basePlan baseLogicalPlan + basePhysicalPlan // Originally the WHERE or ON condition is parsed into a single expression, // but after we converted to CNF(Conjunctive normal form), it can be @@ -184,21 +212,28 @@ func (p *Apply) extractCorrelatedCols() []*expression.CorrelatedColumn { // Exists checks if a query returns result. type Exists struct { + *basePlan baseLogicalPlan + basePhysicalPlan } // MaxOneRow checks if a query returns no more than one row. type MaxOneRow struct { + *basePlan baseLogicalPlan + basePhysicalPlan } // TableDual represents a dual table plan. type TableDual struct { + *basePlan baseLogicalPlan + basePhysicalPlan } // DataSource represents a tablescan without condition push down. type DataSource struct { + *basePlan baseLogicalPlan indexHints []*ast.IndexHint @@ -215,12 +250,16 @@ type DataSource struct { // Union represents Union plan. type Union struct { + *basePlan baseLogicalPlan + basePhysicalPlan } // Sort stands for the order by plan. type Sort struct { + *basePlan baseLogicalPlan + basePhysicalPlan ByItems []*ByItems ExecLimit *Limit @@ -236,14 +275,18 @@ func (p *Sort) extractCorrelatedCols() []*expression.CorrelatedColumn { // Update represents Update plan. type Update struct { + *basePlan baseLogicalPlan + basePhysicalPlan OrderedList []*expression.Assignment } // Delete represents a delete plan. type Delete struct { + *basePlan baseLogicalPlan + basePhysicalPlan Tables []*ast.TableName IsMultiTable bool diff --git a/plan/match_property.go b/plan/match_property.go index a36b117970..579fefc18c 100644 --- a/plan/match_property.go +++ b/plan/match_property.go @@ -28,13 +28,13 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy cost = float64(prop.limit.Count+prop.limit.Offset) * netWorkFactor } if len(prop.props) == 0 { - newTS := *ts + newTS := ts.Copy().(*PhysicalTableScan) newTS.addLimit(prop.limit) - p := newTS.tryToAddUnionScan(&newTS) + p := newTS.tryToAddUnionScan(newTS) return enforceProperty(prop, &physicalPlanInfo{p: p, cost: cost, count: infos[0].count}) } if len(prop.props) == 1 && ts.pkCol != nil && ts.pkCol.Equal(prop.props[0].col, ts.ctx) { - sortedTS := *ts + sortedTS := ts.Copy().(*PhysicalTableScan) sortedTS.Desc = prop.props[0].desc sortedTS.KeepOrder = true sortedTS.addLimit(prop.limit) @@ -42,14 +42,14 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy if len(sortedTS.tableFilterConditions) > 0 { cost += rowCount * cpuFactor } - p := sortedTS.tryToAddUnionScan(&sortedTS) + p := sortedTS.tryToAddUnionScan(sortedTS) return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{ p: p, cost: cost, count: infos[0].count}) } if prop.limit != nil { - sortedTS := *ts + sortedTS := ts.Copy().(*PhysicalTableScan) success := sortedTS.addTopN(ts.ctx, prop) if success { cost += rowCount * cpuFactor @@ -57,7 +57,7 @@ func (ts *PhysicalTableScan) matchProperty(prop *requiredProperty, infos ...*phy cost = rowCount * netWorkFactor } sortedTS.KeepOrder = true - p := sortedTS.tryToAddUnionScan(&sortedTS) + p := sortedTS.tryToAddUnionScan(sortedTS) return enforceProperty(prop, &physicalPlanInfo{ p: p, cost: cost, @@ -135,11 +135,11 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy } sortedCost := cost + rowCount*cpuFactor if allAsc || allDesc { - sortedIS := *is + sortedIS := is.Copy().(*PhysicalIndexScan) sortedIS.OutOfOrder = false sortedIS.Desc = allDesc && !allAsc sortedIS.addLimit(prop.limit) - p := sortedIS.tryToAddUnionScan(&sortedIS) + p := sortedIS.tryToAddUnionScan(sortedIS) return enforceProperty(&requiredProperty{limit: prop.limit}, &physicalPlanInfo{ p: p, cost: sortedCost, @@ -147,7 +147,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy } } if prop.limit != nil { - sortedIS := *is + sortedIS := is.Copy().(*PhysicalIndexScan) success := sortedIS.addTopN(is.ctx, prop) if success { cost += float64(infos[0].count) * cpuFactor @@ -155,7 +155,7 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy cost = float64(infos[0].count) * netWorkFactor } sortedIS.OutOfOrder = true - p := sortedIS.tryToAddUnionScan(&sortedIS) + p := sortedIS.tryToAddUnionScan(sortedIS) return enforceProperty(prop, &physicalPlanInfo{ p: p, cost: cost, @@ -167,17 +167,17 @@ func (is *PhysicalIndexScan) matchProperty(prop *requiredProperty, infos ...*phy // matchProperty implements PhysicalPlan matchProperty interface. func (p *PhysicalHashSemiJoin) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { lRes, rRes := childPlanInfo[0], childPlanInfo[1] - np := *p + np := p.Copy() np.SetChildren(lRes.p, rRes.p) cost := lRes.cost + rRes.cost - return &physicalPlanInfo{p: &np, cost: cost} + return &physicalPlanInfo{p: np, cost: cost} } // matchProperty implements PhysicalPlan matchProperty interface. func (p *PhysicalApply) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - np := *p + np := p.Copy() np.SetChildren(childPlanInfo[0].p) - return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost} + return &physicalPlanInfo{p: np, cost: childPlanInfo[0].cost} } func estimateJoinCount(lc uint64, rc uint64) uint64 { @@ -191,19 +191,19 @@ func estimateJoinCount(lc uint64, rc uint64) uint64 { // matchProperty implements PhysicalPlan matchProperty interface. func (p *PhysicalMergeJoin) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { lRes, rRes := childPlanInfo[0], childPlanInfo[1] - np := *p + np := p.Copy() np.SetChildren(lRes.p, rRes.p) cost := lRes.cost + rRes.cost - return &physicalPlanInfo{p: &np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)} + return &physicalPlanInfo{p: np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)} } // matchProperty implements PhysicalPlan matchProperty interface. func (p *PhysicalHashJoin) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { lRes, rRes := childPlanInfo[0], childPlanInfo[1] lCount, rCount := float64(lRes.count), float64(rRes.count) - np := *p + np := p.Copy().(*PhysicalHashJoin) np.SetChildren(lRes.p, rRes.p) if len(prop.props) != 0 { np.Concurrency = 1 @@ -214,12 +214,12 @@ func (p *PhysicalHashJoin) matchProperty(prop *requiredProperty, childPlanInfo . } else { cost += rCount + memoryFactor*lCount } - return &physicalPlanInfo{p: &np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)} + return &physicalPlanInfo{p: np, cost: cost, count: estimateJoinCount(lRes.count, rRes.count)} } // matchProperty implements PhysicalPlan matchProperty interface. func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - np := *p + np := p.Copy() children := make([]Plan, 0, len(childPlanInfo)) cost := float64(0) count := uint64(0) @@ -229,16 +229,16 @@ func (p *Union) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPla count += res.count } np.SetChildren(children...) - return &physicalPlanInfo{p: &np, cost: cost, count: count} + return &physicalPlanInfo{p: np, cost: cost, count: count} } // matchProperty implements PhysicalPlan matchProperty interface. func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { if p.onTable { res := p.children[0].(PhysicalPlan).matchProperty(prop, childPlanInfo...) - sel := *p + sel := p.Copy() sel.SetChildren(res.p) - res.p = &sel + res.p = sel res.count = uint64(float64(res.count) * selectionFactor) return res } @@ -249,9 +249,9 @@ func (p *Selection) matchProperty(prop *requiredProperty, childPlanInfo ...*phys func (p *PhysicalUnionScan) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { limit := prop.limit res := p.children[0].(PhysicalPlan).matchProperty(convertLimitOffsetToCount(prop), childPlanInfo...) - np := *p + np := p.Copy() np.SetChildren(res.p) - res.p = &np + res.p = np if limit != nil { res = addPlanToResponse(limit, res) } @@ -260,9 +260,9 @@ func (p *PhysicalUnionScan) matchProperty(prop *requiredProperty, childPlanInfo // matchProperty implements PhysicalPlan matchProperty interface. func (p *Projection) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - np := *p + np := p.Copy() np.SetChildren(childPlanInfo[0].p) - return &physicalPlanInfo{p: &np, cost: childPlanInfo[0].cost} + return &physicalPlanInfo{p: np, cost: childPlanInfo[0].cost} } // matchProperty implements PhysicalPlan matchProperty interface. @@ -271,77 +271,7 @@ func (p *Analyze) matchProperty(_ *requiredProperty, childPlanInfo ...*physicalP for _, res := range childPlanInfo { children = append(children, res.p) } - np := *p + np := p.Copy() np.SetChildren(children...) - return &physicalPlanInfo{p: &np} -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Cache) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *MaxOneRow) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Exists) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *PhysicalAggregation) matchProperty(prop *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Limit) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *TableDual) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Sort) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Insert) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *SelectLock) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Update) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *PhysicalDummyScan) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Delete) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *Show) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") -} - -// matchProperty implements PhysicalPlan matchProperty interface. -func (p *PhysicalMemTable) matchProperty(_ *requiredProperty, _ ...*physicalPlanInfo) *physicalPlanInfo { - panic("You can't call this function!") + return &physicalPlanInfo{p: np} } diff --git a/plan/physical_plan_builder.go b/plan/physical_plan_builder.go index c893831927..9b62d1477a 100644 --- a/plan/physical_plan_builder.go +++ b/plan/physical_plan_builder.go @@ -42,17 +42,14 @@ var JoinConcurrency = 5 func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInfo, error) { client := p.ctx.GetClient() - ts := &PhysicalTableScan{ + ts := PhysicalTableScan{ Table: p.tableInfo, Columns: p.Columns, TableAsName: p.TableAsName, DBName: p.DBName, physicalTableSource: physicalTableSource{client: client}, - } - ts.tp = TypeTableScan - ts.allocator = p.allocator + }.init(p.allocator, p.ctx) ts.SetSchema(p.Schema()) - ts.initIDAndContext(p.ctx) if p.ctx.Txn() != nil { ts.readOnly = p.ctx.Txn().IsReadOnly() } else { @@ -64,7 +61,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf table := p.tableInfo sc := p.ctx.GetSessionVars().StmtCtx if sel, ok := p.parents[0].(*Selection); ok { - newSel := *sel + newSel := sel.Copy().(*Selection) conds := make([]expression.Expression, 0, len(sel.Conditions)) for _, cond := range sel.Conditions { conds = append(conds, cond.Clone()) @@ -80,7 +77,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf if len(newSel.Conditions) > 0 { newSel.SetChildren(ts) newSel.onTable = true - resultPlan = &newSel + resultPlan = newSel } } else { ts.Ranges = []TableRange{{math.MinInt64, math.MaxInt64}} @@ -115,7 +112,7 @@ func (p *DataSource) convert2TableScan(prop *requiredProperty) (*physicalPlanInf func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.IndexInfo) (*physicalPlanInfo, error) { client := p.ctx.GetClient() - is := &PhysicalIndexScan{ + is := PhysicalIndexScan{ Index: index, Table: p.tableInfo, Columns: p.Columns, @@ -123,10 +120,7 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde OutOfOrder: true, DBName: p.DBName, physicalTableSource: physicalTableSource{client: client}, - } - is.tp = TypeIdxScan - is.allocator = p.allocator - is.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) is.SetSchema(p.schema) if p.ctx.Txn() != nil { is.readOnly = p.ctx.Txn().IsReadOnly() @@ -140,7 +134,7 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde rowCount := uint64(statsTbl.Count) sc := p.ctx.GetSessionVars().StmtCtx if sel, ok := p.parents[0].(*Selection); ok { - newSel := *sel + newSel := sel.Copy().(*Selection) conds := make([]expression.Expression, 0, len(sel.Conditions)) for _, cond := range sel.Conditions { conds = append(conds, cond.Clone()) @@ -168,7 +162,7 @@ func (p *DataSource) convert2IndexScan(prop *requiredProperty, index *model.Inde if len(newSel.Conditions) > 0 { newSel.SetChildren(is) newSel.onTable = true - resultPlan = &newSel + resultPlan = newSel } } else { rb := rangeBuilder{sc: p.ctx.GetSessionVars().StmtCtx} @@ -225,15 +219,12 @@ func (p *DataSource) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlan memDB := infoschema.IsMemoryDB(p.DBName.L) isDistReq := !memDB && client != nil && client.SupportRequestType(kv.ReqTypeSelect, 0) if !isDistReq { - memTable := &PhysicalMemTable{ + memTable := PhysicalMemTable{ DBName: p.DBName, Table: p.tableInfo, Columns: p.Columns, TableAsName: p.TableAsName, - } - memTable.tp = TypeMemTableScan - memTable.allocator = p.allocator - memTable.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) memTable.SetSchema(p.schema) rb := &rangeBuilder{sc: p.ctx.GetSessionVars().StmtCtx} memTable.Ranges = rb.buildTableRanges(fullRange) @@ -278,10 +269,7 @@ func (p *DataSource) tryToConvert2DummyScan(prop *requiredProperty) (*physicalPl return nil, errors.Trace(err) } if !result { - dummy := &PhysicalDummyScan{} - dummy.tp = TypeDummy - dummy.allocator = p.allocator - dummy.initIDAndContext(p.ctx) + dummy := PhysicalDummyScan{}.init(p.allocator, p.ctx) dummy.SetSchema(p.schema) info := &physicalPlanInfo{p: dummy} p.storePlanInfo(prop, info) @@ -310,13 +298,10 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl for _, col := range prop.props { items = append(items, &ByItems{Expr: col.col, Desc: col.desc}) } - sort := &Sort{ + sort := Sort{ ByItems: items, ExecLimit: prop.limit, - } - sort.tp = TypeSort - sort.allocator = info.p.Allocator() - sort.initIDAndContext(info.p.context()) + }.init(info.p.Allocator(), info.p.context()) sort.SetSchema(info.p.Schema()) info = addPlanToResponse(sort, info) @@ -326,10 +311,7 @@ func enforceProperty(prop *requiredProperty, info *physicalPlanInfo) *physicalPl } info.cost += sortCost(count) } else if prop.limit != nil { - limit := prop.limit.Copy().(*Limit) - limit.tp = TypeLimit - limit.allocator = info.p.Allocator() - limit.initIDAndContext(info.p.context()) + limit := Limit{Offset: prop.limit.Offset, Count: prop.limit.Count}.init(info.p.Allocator(), info.p.context()) limit.SetSchema(info.p.Schema()) info = addPlanToResponse(limit, info) } @@ -409,18 +391,14 @@ func (p *Join) convert2PhysicalPlanSemi(prop *requiredProperty) (*physicalPlanIn allLeft = false } } - join := &PhysicalHashSemiJoin{ + join := PhysicalHashSemiJoin{ WithAux: LeftOuterSemiJoin == p.JoinType, EqualConditions: p.EqualConditions, LeftConditions: p.LeftConditions, RightConditions: p.RightConditions, OtherConditions: p.OtherConditions, Anti: p.anti, - } - join.ctx = p.ctx - join.tp = TypeHashSemiJoin - join.allocator = p.allocator - join.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) join.SetSchema(p.schema) lProp := prop if !allLeft { @@ -461,7 +439,7 @@ func (p *Join) convert2PhysicalPlanLeft(prop *requiredProperty, innerJoin bool) allLeft = false } } - join := &PhysicalHashJoin{ + join := PhysicalHashJoin{ EqualConditions: p.EqualConditions, LeftConditions: p.LeftConditions, RightConditions: p.RightConditions, @@ -470,10 +448,7 @@ func (p *Join) convert2PhysicalPlanLeft(prop *requiredProperty, innerJoin bool) // TODO: decide concurrency by data size. Concurrency: JoinConcurrency, DefaultValues: p.DefaultValues, - } - join.tp = TypeHashLeftJoin - join.allocator = p.allocator - join.initIDAndContext(lChild.context()) + }.init(p.allocator, p.ctx) join.SetSchema(p.schema) if innerJoin { join.JoinType = InnerJoin @@ -534,7 +509,7 @@ func (p *Join) convert2PhysicalPlanRight(prop *requiredProperty, innerJoin bool) allRight = false } } - join := &PhysicalHashJoin{ + join := PhysicalHashJoin{ EqualConditions: p.EqualConditions, LeftConditions: p.LeftConditions, RightConditions: p.RightConditions, @@ -542,10 +517,7 @@ func (p *Join) convert2PhysicalPlanRight(prop *requiredProperty, innerJoin bool) // TODO: decide concurrency by data size. Concurrency: JoinConcurrency, DefaultValues: p.DefaultValues, - } - join.tp = TypeHashRightJoin - join.allocator = p.allocator - join.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) join.SetSchema(p.schema) if innerJoin { join.JoinType = InnerJoin @@ -649,7 +621,7 @@ func (p *Join) convert2PhysicalMergeJoin(parentProp *requiredProperty, lProp *re otherFilter := append(expression.ScalarFuncs2Exprs(newEQConds), p.OtherConditions...) - join := &PhysicalMergeJoin{ + join := PhysicalMergeJoin{ EqualConditions: []*expression.ScalarFunction{eqCond}, LeftConditions: p.LeftConditions, RightConditions: p.RightConditions, @@ -657,10 +629,7 @@ func (p *Join) convert2PhysicalMergeJoin(parentProp *requiredProperty, lProp *re DefaultValues: p.DefaultValues, // Assume order for both side are the same Desc: lProp.props[0].desc, - } - join.tp = TypeMergeJoin - join.allocator = p.allocator - join.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) join.SetSchema(p.schema) join.JoinType = joinType @@ -831,14 +800,11 @@ func (p *Aggregation) convert2PhysicalPlanStream(prop *requiredProperty) (*physi return &physicalPlanInfo{cost: math.MaxFloat64}, nil } } - agg := &PhysicalAggregation{ + agg := PhysicalAggregation{ AggType: StreamedAgg, AggFuncs: p.AggFuncs, GroupByItems: p.GroupByItems, - } - agg.tp = TypeStreamAgg - agg.allocator = p.allocator - agg.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) agg.HasGby = len(p.GroupByItems) > 0 agg.SetSchema(p.schema) // TODO: Consider distinct key. @@ -879,14 +845,11 @@ func (p *Aggregation) convert2PhysicalPlanStream(prop *requiredProperty) (*physi // convert2PhysicalPlanFinalHash converts the logical aggregation to the final hash aggregation *physicalPlanInfo. func (p *Aggregation) convert2PhysicalPlanFinalHash(x physicalDistSQLPlan, childInfo *physicalPlanInfo) *physicalPlanInfo { - agg := &PhysicalAggregation{ + agg := PhysicalAggregation{ AggType: FinalAgg, AggFuncs: p.AggFuncs, GroupByItems: p.GroupByItems, - } - agg.tp = TypeHashAgg - agg.allocator = p.allocator - agg.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) agg.SetSchema(p.schema) agg.HasGby = len(p.GroupByItems) > 0 schema := x.addAggregation(p.ctx, agg) @@ -903,14 +866,11 @@ func (p *Aggregation) convert2PhysicalPlanFinalHash(x physicalDistSQLPlan, child // convert2PhysicalPlanCompleteHash converts the logical aggregation to the complete hash aggregation *physicalPlanInfo. func (p *Aggregation) convert2PhysicalPlanCompleteHash(childInfo *physicalPlanInfo) *physicalPlanInfo { - agg := &PhysicalAggregation{ + agg := PhysicalAggregation{ AggType: CompleteAgg, AggFuncs: p.AggFuncs, GroupByItems: p.GroupByItems, - } - agg.tp = TypeHashAgg - agg.allocator = p.allocator - agg.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) agg.HasGby = len(p.GroupByItems) > 0 agg.SetSchema(p.schema) info := addPlanToResponse(agg, childInfo) @@ -1046,17 +1006,14 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool) } return nil, false } - ts := &PhysicalTableScan{ + ts := PhysicalTableScan{ Table: ds.tableInfo, Columns: ds.Columns, TableAsName: ds.TableAsName, DBName: ds.DBName, physicalTableSource: physicalTableSource{client: ds.ctx.GetClient()}, - } - ts.tp = TypeTableScan - ts.allocator = ds.allocator + }.init(p.allocator, p.ctx) ts.SetSchema(ds.schema) - ts.initIDAndContext(ds.ctx) if ds.ctx.Txn() != nil { ts.readOnly = p.ctx.Txn().IsReadOnly() } else { @@ -1077,7 +1034,7 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool) for _, cond := range corColConds { condsBackUp = append(condsBackUp, cond.Clone()) } - is := &PhysicalIndexScan{ + is := PhysicalIndexScan{ Table: ds.tableInfo, Index: idx, Columns: ds.Columns, @@ -1085,11 +1042,8 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool) OutOfOrder: true, DBName: ds.DBName, physicalTableSource: physicalTableSource{client: ds.ctx.GetClient()}, - } - is.tp = TypeIdxScan - is.allocator = ds.allocator + }.init(p.allocator, p.ctx) is.SetSchema(ds.schema) - is.initIDAndContext(ds.ctx) if is.ctx.Txn() != nil { is.readOnly = p.ctx.Txn().IsReadOnly() } else { @@ -1114,11 +1068,11 @@ func (p *Selection) makeScanController(onlyJudge bool) (*physicalPlanInfo, bool) } child = chosenPlan } - newSel := *p + newSel := p.Copy().(*Selection) newSel.ScanController = true newSel.SetChildren(child) info := &physicalPlanInfo{ - p: &newSel, + p: newSel, count: uint64(ds.statisticTable.Count), } info.cost = float64(info.count) * selectionFactor @@ -1161,10 +1115,10 @@ func (p *Selection) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanI } func (p *Selection) appendSelToInfo(info *physicalPlanInfo) *physicalPlanInfo { - np := *p + np := p.Copy().(*Selection) np.SetChildren(info.p) return &physicalPlanInfo{ - p: &np, + p: np, cost: info.cost, count: uint64(float64(info.count) * selectionFactor), } @@ -1336,9 +1290,9 @@ func (p *Sort) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, sortedPlanInfo = addPlanToResponse(np, sortedPlanInfo) } else if sortCost+unSortedPlanInfo.cost < sortedPlanInfo.cost { sortedPlanInfo.cost = sortCost + unSortedPlanInfo.cost - np := *p + np := p.Copy().(*Sort) np.ExecLimit = selfProp.limit - sortedPlanInfo = addPlanToResponse(&np, unSortedPlanInfo) + sortedPlanInfo = addPlanToResponse(np, unSortedPlanInfo) } if !matchProp(p.ctx, prop, selfProp) { sortedPlanInfo.cost = math.MaxFloat64 @@ -1366,13 +1320,10 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } switch info.p.(type) { case *PhysicalHashJoin, *PhysicalHashSemiJoin: - ap := &PhysicalApply{ + ap := PhysicalApply{ PhysicalJoin: info.p, OuterSchema: p.corCols, - } - ap.tp = TypeApply - ap.allocator = p.allocator - ap.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) ap.SetChildren(info.p.Children()...) ap.SetSchema(info.p.Schema()) info.p = ap @@ -1386,17 +1337,14 @@ func (p *Apply) convert2PhysicalPlan(prop *requiredProperty) (*physicalPlanInfo, } func (p *Analyze) prepareSimpleTableScan(cols []*model.ColumnInfo) *PhysicalTableScan { - ts := &PhysicalTableScan{ + ts := PhysicalTableScan{ Table: p.Table.TableInfo, Columns: cols, TableAsName: &p.Table.Name, DBName: p.Table.DBInfo.Name, physicalTableSource: physicalTableSource{client: p.ctx.GetClient()}, - } - ts.tp = TypeTableScan - ts.allocator = p.allocator + }.init(p.allocator, p.ctx) ts.SetSchema(expression.NewSchema(expression.ColumnInfos2Columns(ts.Table.Name, cols)...)) - ts.initIDAndContext(p.ctx) ts.readOnly = true ts.Ranges = []TableRange{{math.MinInt64, math.MaxInt64}} return ts @@ -1404,7 +1352,7 @@ func (p *Analyze) prepareSimpleTableScan(cols []*model.ColumnInfo) *PhysicalTabl func (p *Analyze) prepareSimpleIndexScan(idxOffset int, cols []*model.ColumnInfo) *PhysicalIndexScan { tblInfo := p.Table.TableInfo - is := &PhysicalIndexScan{ + is := PhysicalIndexScan{ Index: tblInfo.Indices[idxOffset], Table: tblInfo, Columns: cols, @@ -1413,10 +1361,7 @@ func (p *Analyze) prepareSimpleIndexScan(idxOffset int, cols []*model.ColumnInfo DBName: p.Table.DBInfo.Name, physicalTableSource: physicalTableSource{client: p.ctx.GetClient()}, DoubleRead: false, - } - is.tp = TypeAnalyze - is.allocator = p.allocator - is.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) is.SetSchema(expression.NewSchema(expression.ColumnInfos2Columns(tblInfo.Name, cols)...)) is.readOnly = true rb := rangeBuilder{sc: p.ctx.GetSessionVars().StmtCtx} @@ -1485,10 +1430,7 @@ func addCachePlan(p PhysicalPlan, allocator *idAllocator) []*expression.Correlat childCorCols := addCachePlan(child.(PhysicalPlan), allocator) // If p is a Selection and controls the access condition of below scan plan, there shouldn't have a cache plan. if sel, ok := p.(*Selection); len(selfCorCols) > 0 && len(childCorCols) == 0 && (!ok || !sel.ScanController) { - newChild := &Cache{} - newChild.tp = TypeCache - newChild.allocator = allocator - newChild.initIDAndContext(p.context()) + newChild := Cache{}.init(p.Allocator(), p.context()) newChild.SetSchema(child.Schema()) addChild(newChild, child) diff --git a/plan/physical_plans.go b/plan/physical_plans.go index 8083199a37..57abf5511e 100644 --- a/plan/physical_plans.go +++ b/plan/physical_plans.go @@ -36,6 +36,33 @@ var ( _ physicalDistSQLPlan = &PhysicalIndexScan{} ) +var ( + _ PhysicalPlan = &Selection{} + _ PhysicalPlan = &Projection{} + _ PhysicalPlan = &Exists{} + _ PhysicalPlan = &MaxOneRow{} + _ PhysicalPlan = &TableDual{} + _ PhysicalPlan = &Union{} + _ PhysicalPlan = &Sort{} + _ PhysicalPlan = &Update{} + _ PhysicalPlan = &Delete{} + _ PhysicalPlan = &SelectLock{} + _ PhysicalPlan = &Limit{} + _ PhysicalPlan = &Show{} + _ PhysicalPlan = &Insert{} + _ PhysicalPlan = &Analyze{} + _ PhysicalPlan = &PhysicalIndexScan{} + _ PhysicalPlan = &PhysicalTableScan{} + _ PhysicalPlan = &PhysicalAggregation{} + _ PhysicalPlan = &PhysicalApply{} + _ PhysicalPlan = &PhysicalDummyScan{} + _ PhysicalPlan = &PhysicalHashJoin{} + _ PhysicalPlan = &PhysicalHashSemiJoin{} + _ PhysicalPlan = &PhysicalMergeJoin{} + _ PhysicalPlan = &PhysicalUnionScan{} + _ PhysicalPlan = &Cache{} +) + // PhysicalIndexScan represents an index scan plan. type PhysicalIndexScan struct { physicalTableSource @@ -61,7 +88,8 @@ type PhysicalIndexScan struct { // PhysicalMemTable reads memory table. type PhysicalMemTable struct { - basePlan + *basePlan + basePhysicalPlan DBName model.CIStr Table *model.TableInfo @@ -72,7 +100,10 @@ type PhysicalMemTable struct { // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalMemTable) Copy() PhysicalPlan { - return &(*p) + np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) + return &np } // physicalDistSQLPlan means the plan that can be executed distributively. @@ -114,7 +145,8 @@ func (p *PhysicalTableScan) calculateCost(resultCount uint64, scanCount uint64) } type physicalTableSource struct { - basePlan + *basePlan + basePhysicalPlan client kv.Client @@ -217,12 +249,9 @@ func (p *physicalTableSource) tryToAddUnionScan(resultPlan PhysicalPlan) Physica return resultPlan } conditions := append(p.indexFilterConditions, p.tableFilterConditions...) - us := &PhysicalUnionScan{ + us := PhysicalUnionScan{ Condition: expression.ComposeCNFCondition(p.ctx, append(conditions, p.AccessCondition...)...), - } - us.tp = TypeUnionScan - us.allocator = p.allocator - us.initIDAndContext(p.ctx) + }.init(p.allocator, p.ctx) us.SetChildren(resultPlan) us.SetSchema(resultPlan.Schema()) return us @@ -342,12 +371,14 @@ type PhysicalTableScan struct { // PhysicalDummyScan is a dummy table that returns nothing. type PhysicalDummyScan struct { - basePlan + *basePlan + basePhysicalPlan } // PhysicalApply represents apply plan, only used for subquery. type PhysicalApply struct { - basePlan + *basePlan + basePhysicalPlan PhysicalJoin PhysicalPlan OuterSchema []*expression.CorrelatedColumn @@ -355,7 +386,8 @@ type PhysicalApply struct { // PhysicalHashJoin represents hash join for inner/ outer join. type PhysicalHashJoin struct { - basePlan + *basePlan + basePhysicalPlan JoinType JoinType @@ -371,7 +403,8 @@ type PhysicalHashJoin struct { // PhysicalMergeJoin represents merge join for inner/ outer join. type PhysicalMergeJoin struct { - basePlan + *basePlan + basePhysicalPlan JoinType JoinType @@ -386,7 +419,8 @@ type PhysicalMergeJoin struct { // PhysicalHashSemiJoin represents hash join for semi join. type PhysicalHashSemiJoin struct { - basePlan + *basePlan + basePhysicalPlan WithAux bool Anti bool @@ -411,7 +445,8 @@ const ( // PhysicalAggregation is Aggregation's physical plan. type PhysicalAggregation struct { - basePlan + *basePlan + basePhysicalPlan HasGby bool AggType AggregationType @@ -421,14 +456,16 @@ type PhysicalAggregation struct { // PhysicalUnionScan represents a union scan operator. type PhysicalUnionScan struct { - basePlan + *basePlan + basePhysicalPlan Condition expression.Expression } // Cache plan is a physical plan which stores the result of its child node. type Cache struct { - basePlan + *basePlan + basePhysicalPlan } func (p *PhysicalMergeJoin) tryConsumeOrder(prop *requiredProperty, eqCond *expression.ScalarFunction) *requiredProperty { @@ -530,6 +567,8 @@ func (p *PhysicalAggregation) extractCorrelatedCols() []*expression.CorrelatedCo // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalIndexScan) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -564,6 +603,8 @@ func (p *PhysicalIndexScan) IsPointGetByUniqueKey(sc *variable.StatementContext) // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalTableScan) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -596,6 +637,8 @@ func (p *PhysicalMemTable) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalApply) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -616,6 +659,8 @@ func (p *PhysicalApply) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalHashSemiJoin) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -657,12 +702,16 @@ func (p *PhysicalHashSemiJoin) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalHashJoin) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalMergeJoin) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -736,6 +785,9 @@ func (p *PhysicalMergeJoin) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *Selection) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -756,6 +808,9 @@ func (p *Selection) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *Projection) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -775,24 +830,36 @@ func (p *Projection) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *Exists) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *MaxOneRow) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *Insert) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *Limit) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -810,6 +877,9 @@ func (p *Limit) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *Union) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -843,18 +913,26 @@ func (p *Sort) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *TableDual) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *SelectLock) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalAggregation) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } @@ -879,41 +957,59 @@ func (p *PhysicalAggregation) MarshalJSON() ([]byte, error) { // Copy implements the PhysicalPlan Copy interface. func (p *Update) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalDummyScan) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *Delete) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *Show) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *PhysicalUnionScan) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the PhysicalPlan Copy interface. func (p *Cache) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } // Copy implements the Analyze Copy interface. func (p *Analyze) Copy() PhysicalPlan { np := *p + np.basePlan = p.basePlan.copy() + np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan) + np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan) return &np } diff --git a/plan/plan.go b/plan/plan.go index d9f4d0c7ac..b55298c55e 100644 --- a/plan/plan.go +++ b/plan/plan.go @@ -214,9 +214,12 @@ type PhysicalPlan interface { } type baseLogicalPlan struct { - basePlan - planMap map[string]*physicalPlanInfo - self LogicalPlan + basePlan *basePlan + planMap map[string]*physicalPlanInfo +} + +type basePhysicalPlan struct { + basePlan *basePlan } func (p *baseLogicalPlan) getPlanInfo(prop *requiredProperty) (*physicalPlanInfo, error) { @@ -235,15 +238,15 @@ func (p *baseLogicalPlan) convert2PhysicalPlan(prop *requiredProperty) (*physica if info != nil { return info, nil } - if len(p.children) == 0 { - return &physicalPlanInfo{p: p.self.(PhysicalPlan)}, nil + if len(p.basePlan.children) == 0 { + return &physicalPlanInfo{p: p.basePlan.self.(PhysicalPlan)}, nil } - child := p.children[0].(LogicalPlan) + child := p.basePlan.children[0].(LogicalPlan) info, err = child.convert2PhysicalPlan(prop) if err != nil { return nil, errors.Trace(err) } - info = addPlanToResponse(p.self.(PhysicalPlan), info) + info = addPlanToResponse(p.basePlan.self.(PhysicalPlan), info) return info, p.storePlanInfo(prop, info) } @@ -258,50 +261,67 @@ func (p *baseLogicalPlan) storePlanInfo(prop *requiredProperty, info *physicalPl } func (p *baseLogicalPlan) buildKeyInfo() { - for _, child := range p.Children() { + for _, child := range p.basePlan.children { child.(LogicalPlan).buildKeyInfo() } - if len(p.children) == 1 { - switch p.self.(type) { + if len(p.basePlan.children) == 1 { + switch p.basePlan.self.(type) { case *Exists, *Aggregation, *Projection: - p.schema.Keys = nil + p.basePlan.schema.Keys = nil case *SelectLock: - p.schema.Keys = p.children[0].Schema().Keys + p.basePlan.schema.Keys = p.basePlan.children[0].Schema().Keys default: - p.schema.Keys = p.children[0].Schema().Clone().Keys + p.basePlan.schema.Keys = p.basePlan.children[0].Schema().Clone().Keys } } else { - p.schema.Keys = nil + p.basePlan.schema.Keys = nil } } -func newBaseLogicalPlan(tp string, a *idAllocator) baseLogicalPlan { - return baseLogicalPlan{ - planMap: make(map[string]*physicalPlanInfo), - basePlan: basePlan{ - tp: tp, - allocator: a, - }, +func newBasePlan(tp string, allocator *idAllocator, ctx context.Context, p Plan) *basePlan { + return &basePlan{ + tp: tp, + allocator: allocator, + id: tp + allocator.allocID(), + ctx: ctx, + self: p, } } +func newBaseLogicalPlan(basePlan *basePlan) baseLogicalPlan { + return baseLogicalPlan{ + planMap: make(map[string]*physicalPlanInfo), + basePlan: basePlan, + } +} + +func newBasePhysicalPlan(basePlan *basePlan) basePhysicalPlan { + return basePhysicalPlan{ + basePlan: basePlan, + } +} + +func (p *basePhysicalPlan) matchProperty(prop *requiredProperty, childPlanInfo ...*physicalPlanInfo) *physicalPlanInfo { + panic("You can't call this function!") +} + // PredicatePushDown implements LogicalPlan interface. func (p *baseLogicalPlan) PredicatePushDown(predicates []expression.Expression) ([]expression.Expression, LogicalPlan, error) { - if len(p.Children()) == 0 { - return predicates, p.self, nil + if len(p.basePlan.children) == 0 { + return predicates, p.basePlan.self.(LogicalPlan), nil } - child := p.children[0].(LogicalPlan) + child := p.basePlan.children[0].(LogicalPlan) rest, _, err := child.PredicatePushDown(predicates) if err != nil { return nil, nil, errors.Trace(err) } if len(rest) > 0 { - err = addSelection(p, child, rest, p.allocator) + err = addSelection(p.basePlan.self, child, rest, p.basePlan.allocator) if err != nil { return nil, nil, errors.Trace(err) } } - return nil, p.self, nil + return nil, p.basePlan.self.(LogicalPlan), nil } func (p *basePlan) extractCorrelatedCols() []*expression.CorrelatedColumn { @@ -318,24 +338,19 @@ func (p *basePlan) Allocator() *idAllocator { // ResolveIndicesAndCorCols implements LogicalPlan interface. func (p *baseLogicalPlan) ResolveIndicesAndCorCols() { - for _, child := range p.children { + for _, child := range p.basePlan.children { child.(LogicalPlan).ResolveIndicesAndCorCols() } } // PruneColumns implements LogicalPlan interface. func (p *baseLogicalPlan) PruneColumns(parentUsedCols []*expression.Column) { - if len(p.children) == 0 { + if len(p.basePlan.children) == 0 { return } - child := p.children[0].(LogicalPlan) + child := p.basePlan.children[0].(LogicalPlan) child.PruneColumns(parentUsedCols) - p.SetSchema(child.Schema()) -} - -func (p *basePlan) initIDAndContext(ctx context.Context) { - p.id = p.tp + p.allocator.allocID() - p.ctx = ctx + p.basePlan.SetSchema(child.Schema()) } // basePlan implements base Plan interface. @@ -349,6 +364,12 @@ type basePlan struct { id string allocator *idAllocator ctx context.Context + self Plan +} + +func (p *basePlan) copy() *basePlan { + np := *p + return &np } // MarshalJSON implements json.Marshaler interface. diff --git a/plan/planbuilder.go b/plan/planbuilder.go index 6dda060e98..9138498225 100644 --- a/plan/planbuilder.go +++ b/plan/planbuilder.go @@ -167,10 +167,7 @@ func (b *planBuilder) buildExecute(v *ast.ExecuteStmt) Plan { func (b *planBuilder) buildDo(v *ast.DoStmt) Plan { exprs := make([]expression.Expression, 0, len(v.Exprs)) - dual := &TableDual{ - baseLogicalPlan: newBaseLogicalPlan(TypeDual, b.allocator), - } - dual.self = dual + dual := TableDual{}.init(b.allocator, b.ctx) for _, astExpr := range v.Exprs { expr, _, err := b.rewrite(astExpr, dual, nil, true) if err != nil { @@ -180,11 +177,7 @@ func (b *planBuilder) buildDo(v *ast.DoStmt) Plan { exprs = append(exprs, expr) } dual.SetSchema(expression.NewSchema()) - p := &Projection{ - Exprs: exprs, - baseLogicalPlan: newBaseLogicalPlan(TypeProj, b.allocator), - } - p.initIDAndContext(b.ctx) + p := Projection{Exprs: exprs}.init(b.allocator, b.ctx) addChild(p, dual) p.self = p p.SetSchema(expression.NewSchema()) @@ -193,8 +186,6 @@ func (b *planBuilder) buildDo(v *ast.DoStmt) Plan { func (b *planBuilder) buildSet(v *ast.SetStmt) Plan { p := &Set{} - p.tp = TypeSet - p.allocator = b.allocator for _, vars := range v.Variables { assign := &expression.VarAssignment{ Name: vars.Name, @@ -217,7 +208,6 @@ func (b *planBuilder) buildSet(v *ast.SetStmt) Plan { } p.VarAssigns = append(p.VarAssigns, assign) } - p.initIDAndContext(b.ctx) p.SetSchema(expression.NewSchema()) return p } @@ -321,12 +311,7 @@ func findIndexByName(indices []*model.IndexInfo, name model.CIStr) *model.IndexI } func (b *planBuilder) buildSelectLock(src Plan, lock ast.SelectLockType) *SelectLock { - selectLock := &SelectLock{ - Lock: lock, - baseLogicalPlan: newBaseLogicalPlan(TypeLock, b.allocator), - } - selectLock.self = selectLock - selectLock.initIDAndContext(b.ctx) + selectLock := SelectLock{Lock: lock}.init(b.allocator, b.ctx) addChild(selectLock, src) selectLock.SetSchema(src.Schema()) return selectLock @@ -404,26 +389,18 @@ func getColumnOffsets(tn *ast.TableName) (indexOffsets []int, columnOffsets []in } func (b *planBuilder) buildAnalyze(as *ast.AnalyzeTableStmt) LogicalPlan { - p := &Analyze{ - baseLogicalPlan: newBaseLogicalPlan(TypeAnalyze, b.allocator), - PkOffset: -1, - } + p := Analyze{PkOffset: -1}.init(b.allocator, b.ctx) for _, tbl := range as.TableNames { idxOffsets, colOffsets, pkOffset := getColumnOffsets(tbl) - result := &Analyze{ - baseLogicalPlan: newBaseLogicalPlan(TypeAnalyze, b.allocator), - Table: tbl, - IdxOffsets: idxOffsets, - ColOffsets: colOffsets, - PkOffset: pkOffset, - } - result.self = result - result.initIDAndContext(b.ctx) + result := Analyze{ + Table: tbl, + IdxOffsets: idxOffsets, + ColOffsets: colOffsets, + PkOffset: pkOffset, + }.init(b.allocator, b.ctx) result.SetSchema(expression.TableInfo2Schema(tbl.TableInfo)) addChild(p, result) } - p.self = p - p.initIDAndContext(b.ctx) p.SetSchema(&expression.Schema{}) return p } @@ -486,19 +463,16 @@ func splitWhere(where ast.ExprNode) []ast.ExprNode { func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan { var resultPlan Plan - p := &Show{ - Tp: show.Tp, - DBName: show.DBName, - Table: show.Table, - Column: show.Column, - Flag: show.Flag, - Full: show.Full, - User: show.User, - baseLogicalPlan: newBaseLogicalPlan(TypeShow, b.allocator), - } + p := Show{ + Tp: show.Tp, + DBName: show.DBName, + Table: show.Table, + Column: show.Column, + Flag: show.Flag, + Full: show.Full, + User: show.User, + }.init(b.allocator, b.ctx) resultPlan = p - p.initIDAndContext(b.ctx) - p.self = p switch show.Tp { case ast.ShowProcedureStatus: p.SetSchema(buildShowProcedureSchema()) @@ -535,12 +509,7 @@ func (b *planBuilder) buildShow(show *ast.ShowStmt) Plan { } } if len(conditions) != 0 { - sel := &Selection{ - baseLogicalPlan: newBaseLogicalPlan(TypeSel, b.allocator), - Conditions: conditions, - } - sel.initIDAndContext(b.ctx) - sel.self = sel + sel := Selection{Conditions: conditions}.init(b.allocator, b.ctx) addChild(sel, p) sel.SetSchema(p.Schema()) resultPlan = sel @@ -631,15 +600,14 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan { b.err = errors.Errorf("Can't get table %s.", tableInfo.Name.O) return nil } - insertPlan := &Insert{ - Table: table, - Columns: insert.Columns, - tableSchema: schema, - IsReplace: insert.IsReplace, - Priority: insert.Priority, - Ignore: insert.Ignore, - baseLogicalPlan: newBaseLogicalPlan(TypeInsert, b.allocator), - } + insertPlan := Insert{ + Table: table, + Columns: insert.Columns, + tableSchema: schema, + IsReplace: insert.IsReplace, + Priority: insert.Priority, + Ignore: insert.Ignore, + }.init(b.allocator, b.ctx) b.visitInfo = append(b.visitInfo, visitInfo{ privilege: mysql.InsertPriv, @@ -696,7 +664,7 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan { Expr: expr, }) } - mockTablePlan := &TableDual{} + mockTablePlan := TableDual{}.init(b.allocator, b.ctx) mockTablePlan.SetSchema(schema) for _, assign := range insert.OnDuplicate { col, err := schema.FindColumn(assign.Column) @@ -718,8 +686,6 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan { Expr: expr, }) } - insertPlan.initIDAndContext(b.ctx) - insertPlan.self = insertPlan if insert.Select != nil { selectPlan := b.build(insert.Select) if b.err != nil { diff --git a/plan/plans.go b/plan/plans.go index 9a7e3c297a..8836c86501 100644 --- a/plan/plans.go +++ b/plan/plans.go @@ -108,14 +108,18 @@ func (ir *IndexRange) String() string { // SelectLock represents a select lock plan. type SelectLock struct { + *basePlan baseLogicalPlan + basePhysicalPlan Lock ast.SelectLockType } // Limit represents offset and limit plan. type Limit struct { + *basePlan baseLogicalPlan + basePhysicalPlan Offset uint64 Count uint64 @@ -147,7 +151,9 @@ type Deallocate struct { // Show represents a show plan. type Show struct { + *basePlan baseLogicalPlan + basePhysicalPlan Tp ast.ShowStmtType // Databases/Tables/Columns/.... DBName string @@ -177,7 +183,9 @@ type Simple struct { // Insert represents an insert plan. type Insert struct { + *basePlan baseLogicalPlan + basePhysicalPlan Table table.Table tableSchema *expression.Schema @@ -193,7 +201,9 @@ type Insert struct { // Analyze represents an analyze plan type Analyze struct { + *basePlan baseLogicalPlan + basePhysicalPlan Table *ast.TableName IdxOffsets []int diff --git a/plan/predicate_push_down.go b/plan/predicate_push_down.go index 8a62efd97d..bc70eb320f 100644 --- a/plan/predicate_push_down.go +++ b/plan/predicate_push_down.go @@ -27,11 +27,7 @@ func (s *ppdSolver) optimize(lp LogicalPlan, _ context.Context, _ *idAllocator) func addSelection(p Plan, child LogicalPlan, conditions []expression.Expression, allocator *idAllocator) error { conditions = expression.PropagateConstant(p.context(), conditions) - selection := &Selection{ - Conditions: conditions, - baseLogicalPlan: newBaseLogicalPlan(TypeSel, allocator)} - selection.self = selection - selection.initIDAndContext(p.context()) + selection := Selection{Conditions: conditions}.init(allocator, p.context()) selection.SetSchema(child.Schema().Clone()) return InsertPlan(p, child, selection) } @@ -71,7 +67,7 @@ func (p *Join) PredicatePushDown(predicates []expression.Expression) (ret []expr } groups, valid := tryToGetJoinGroup(p) if valid { - e := joinReOrderSolver{allocator: p.allocator} + e := joinReOrderSolver{allocator: p.allocator, ctx: p.ctx} e.reorderJoin(groups, predicates) newJoin := e.resultJoin parent := p.parents[0] diff --git a/plan/task_profile.go b/plan/task_profile.go index 5dd5b25422..0633a8573f 100644 --- a/plan/task_profile.go +++ b/plan/task_profile.go @@ -184,12 +184,7 @@ func (sel *Selection) splitSelectionByIndexColumns(schema *expression.Schema) (i indexSel = sel } if len(tableConds) != 0 { - tableSel = &Selection{ - baseLogicalPlan: newBaseLogicalPlan(TypeSel, sel.allocator), - Conditions: tableConds, - } - tableSel.self = tableSel - tableSel.initIDAndContext(sel.ctx) + tableSel = Selection{Conditions: tableConds}.init(sel.allocator, sel.ctx) } return }